Camel ActiveMQ topic route with jms selector

Here’s a quick example how to create a camel route that creates a route from an ActiveMQ topic. It uses a selector to receive a subset of all messages.

import org.apache.camel.builder.RouteBuilder;

public class SubscriberRouteBuilder extends RouteBuilder {
    public String resultEndpoint = "mock:result";
    @Override
    public void configure() throws Exception {
        from("activemq:topic:STOCKS?clientId=1&durableSubscriptionName=dsn1&selector=SE='NASDAQ'")
                .to("log:stocks?showAll=true")
                .to(resultEndpoint);
    }

}

To test this you should start ActiveMQ in the unit test. The test itself uses a mock endpoint to specify how many messages you expect, and what the contents of the messages should be.

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.BeforeClass;
import org.junit.Test;

public class SubscriberRouteBuilderTest extends CamelTestSupport {
    private String rep = "mock:result";
    
    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        SubscriberRouteBuilder srb = new SubscriberRouteBuilder();
        srb.resultEndpoint = rep;
        return new SubscriberRouteBuilder();
    }

    private static BrokerService brokerSvc = null;

    @BeforeClass
    public static void setupClass() throws Exception {
        brokerSvc = new BrokerService();
        brokerSvc.setBrokerName("TestBroker");
        brokerSvc.addConnector("tcp://localhost:61616");
        brokerSvc.start();
    }

    @AfterClass
    public static void shutdownClass() throws Exception {
        brokerSvc.stop();
    }

    @Test
    public void testConfigure() throws Exception {
        String[] nasdaqStocks = new String[]{"AAPL=350.56","ORCL=33.68","CSCO=18.85","GOOG=630.08","AAPL=350.10"};
        getMockEndpoint(rep).expectedMessageCount(5);
        getMockEndpoint(rep).expectedBodiesReceived(Arrays.asList(nasdaqStocks));
        template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[0], "SE", "NASDAQ");
        template.sendBodyAndHeader("activemq:topic:STOCKS", "HD=38.48", "SE", "DOWJONES");
        template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[1], "SE", "NASDAQ");
        template.sendBodyAndHeader("activemq:topic:STOCKS", "HD=38.00", "SE", "DOWJONES");
        template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[2], "SE", "NASDAQ");
        template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[3], "SE", "NASDAQ");
        template.sendBodyAndHeader("activemq:topic:STOCKS", nasdaqStocks[4], "SE", "NASDAQ");
        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
    }
}
blog comments powered by Disqus