durable c consumer for activemq

Thu, Aug 20, 2009

I’ve ported the C++ STOMP code to C# with just a little problem, in that the STOMP support in ActiveMQ NMS doesn’t work. No to worry, the default protocol does. The first thing you need is ActiveMQ NMS (.NET Messaging). I used the 1.1.0 source release downloaded from here. You’ll also need nant to build it. Unzip nms-1.1.0 to NMS_SRC_HOME and make a quick change to nant-common.xml otherwise you’ll get this error:

Unknown function ‘platform::is-windows()‘
open nant-common.xml and replace is-windows() with is-win32(). Found that solution here.

then just run “nant” and it’ll build everything. You can also open NMS_SRC_HOME/vs2008-activemq.sln and build from there once you’ve built from nant. To use the libraries you need to Add References to:

NMS_SRC_HOME/build/net-2.0/debug/Apache.NMS.ActiveMQ.dll
NMS_SRC_HOME/build/net-2.0/debug/Apache.NMS.dll
and copy these files where your .exe can find them:
NMS_SRC_HOME/build/net-2.0/debug/nmsprovider-activemq.config
NMS_SRC_HOME/build/net-2.0/debug/nmsprovider-tcp.config
or you’ll get implementation not found errors. Here’s the code to create a durable consumer in C# using the NMS libraries:
public void readTopic() {
  Uri connectURI = new Uri(“activemq:tcp://host.com:61616”);
  ConnectionFactory connectionFactory = new ConnectionFactory(connectURI);
  IConnection connection = connectionFactory.CreateConnection();
  connection.ClientId = “DurableClientId”;
  connection.Start();
  connection.ExceptionListener += new ExceptionListener(OnException);
  ISession session = connection.CreateSession(
AcknowledgementMode.DupsOkAcknowledge);
  ActiveMQTopic topic = new ActiveMQTopic(“test.t”);
  IMessageConsumer consumer = session.CreateDurableConsumer(
topic, “DurableClientId”, “2 > 1”, false);
  consumer.Listener += new MessageListener(OnMessage);
  Console.WriteLine(“Press any key to exit…”);
  Console.ReadKey();
}

public void OnMessage(IMessage receivedMsg) { IBytesMessage message = receivedMsg as IBytesMessage; string s = System.Text.UTF8Encoding.UTF8.GetString(message.Content); Console.WriteLine(s); }

public void OnException(Exception e) { Console.WriteLine(e.Message); }

The ruby code I wrote is just a simple producer which causes ActiveMQ to create ActiveMQBytesMessage instances instead of ActiveMQTextMessage ones. If you’re sending text messages just ammend OnMessage accordingly.

You can monitor the topics using jconsole:

service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi

comments powered by Disqus