Wednesday, 7 September 2011

Weblogic JMS message receiver

Following on from an earlier post where I wrote a quick piece of java to put files onto an arbitrary JMS topic/queue, I also wanted something to read text documents off a topic/queue. I wrote the following code that takes a topic/queue and continues to read messages off of it, it also supports durable topic subscription IDs. Remember to include wlclient.jar, wljmsclient.jar in the classpath when building/running this (todo: mavenize all of these Weblogic libraries).
import javax.jms.*;
import javax.naming.*;
import java.util.Properties;
import java.io.*;

public class JMSConsumeMessage {

    public static void main(String[] args) throws Exception {
		if (args.length < 5 || args.length > 6) {
			System.out.println ("Usage: TOPIC|QUEUE ProviderURL UserName Password DestName [subscriptionName]");
			return;
		}

		String destType = args[0];
		String provider = args[1];
		String username = args[2];
		String password = args[3];
		String destName = args[4];
		String subscriptionName = null;
		if (args.length == 6) subscriptionName = args[5];

		if (!(destType.equals("QUEUE") || destType.equals("TOPIC"))) throw new Exception ("Must specify destination as TOPIC or QUEUE");

		Properties props = System.getProperties();
		props.put(Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
		props.put(Context.PROVIDER_URL, provider);
		props.put(Context.SECURITY_PRINCIPAL, username);
		props.put(Context.SECURITY_CREDENTIALS, password);

		InitialContext ctx = new InitialContext(props);

		Session session = null;
		MessageConsumer consumer = null;
		Connection connection = null;

		try {
			if (destType.equals("TOPIC")) {
				TopicConnectionFactory tconfactory = (TopicConnectionFactory)ctx.lookup("javax.jms.TopicConnectionFactory");
				TopicConnection tcon = tconfactory.createTopicConnection();
				if (subscriptionName != null) tcon.setClientID(subscriptionName);
				TopicSession tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
				Topic topic = (Topic)ctx.lookup(destName);
				TopicSubscriber tsubscriber = (subscriptionName == null ? tsession.createSubscriber(topic) : tsession.createDurableSubscriber(topic,subscriptionName));
				session = (Session)tsession;
				consumer = (MessageConsumer)tsubscriber;
				connection = (Connection)tcon;
			} else if (destType.equals("QUEUE")) {
				QueueConnectionFactory qconfactory = (QueueConnectionFactory)ctx.lookup("javax.jms.QueueConnectionFactory");
				QueueConnection qcon = qconfactory.createQueueConnection();
				QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
				Queue queue = (Queue)ctx.lookup(destName);
				QueueReceiver qreceiver = qsession.createReceiver(queue);
				session = (Session)qsession;
				consumer = (MessageConsumer)qreceiver;
				connection = (Connection)qcon;
			}

			connection.start();

			int currentIndex = 0;

			while (true) {
				Message m = consumer.receive();
				if (m instanceof TextMessage) {
					TextMessage txt = (TextMessage)m;
					BufferedWriter writer = new BufferedWriter(new FileWriter(destName + "-" + currentIndex + ".txt"));
					writer.write(txt.getText());
					writer.close();
					System.out.println("Read message to: " + destName + "-" + currentIndex + ".txt");
					currentIndex++;
				}
			}


		} finally {
		    if (consumer != null) consumer.close();
		    if (session != null) session.close();
		    if (connection != null) connection.close();
		}

    }

}

No comments:

Post a Comment