Babel Camel Routing

The Routing is used to define where the messages should be routed, or not.

Multicast

The multicast keywords defines a static list of outputs where the message is sent.

val routeDef = new RouteBuilder {
  from("direct:input").as[String].
    //received messages are sent to those three mock endpoints
    multicast("mock:output1", "mock:output2", "mock:output3").
    to("mock:output4")
}

Recipient list

The recipientList is like the multicast keyword, but the list can be dynamic and calculated at runtime.

val routeDef = new RouteBuilder {
  from("direct:input").as[String].
    //received messages targets are dynamically defined by the headers of each message
    recipientList(m => m.headers("recipients")).
    to("mock:output4")
}

Filter

The filter and filterBody keywords filter message with a predicate.

In this example, the predicate is a function taking a message and returning a boolean.

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  //direct:input receives one message with
  //   "1,2,3,true,4,5,6,7"
  from("direct:input").as[String]
    //the splitBody creates 8 messages with string body
    //   by a function which returns a string iterator
    .splitBody(body => body.split(",").iterator)
    //the filter will only let continue the one whose body is "true"
    .filter(msg => msg.body.exists(body => body == "true")).
    to("mock:output")
}

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  //direct:input receives one message with
  //   "1,2,3,true,4,5,6,7"
  from("direct:input").as[String]
    //the splitBody creates 8 messages with string body
    //   by a function which returns a string iterator
    .splitBody(body => body.split(",").iterator)
    //the filter will only let continue the one whose body is "true"
    .filterBody(body => body == "true").
    to("mock:output")
}

Choice

The choice keyword gives you a way to choose where you are sending the message.

You configures a choice with when, whenBody and otherwise keywords. Each when accepts a predicate. In this example the predicates are function taking message and returning a boolean.

val routeDef = new RouteBuilder {
  from("direct:babel").as[String].choice {
    c =>
      c.when(msg => msg.body == Some("1")).
        processBody(body => body + "done").to("mock:output1")
      c.when(msg => msg.body == Some("2")).
        processBody(body => body + "done").to("mock:output2")
      c.when(msg => msg.body == Some("3")).
        processBody(body => body + "done").to("mock:output3")
      c.otherwise.
        processBody(body => body + "done").
        to("mock:output4")
  }
    .to("mock:output5")

Splitter

The split keyword is the way to split a message in pieces, the splitBody does the same directly on the message body.

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:input").as[String]
    //split the message base on its body using the comma
    .splitBody(_.split(",").iterator)
    //which creates several messages
    .filter(msg => msg.body.exists(body => body.contains("true")))
    //required as keyword to recover the type
    .to("mock:output").as[String]
    //split the message base on its body using the spaces
    .splitBody(_.split(" ").iterator)
    .filter(msg => msg.body.exists(body => body == "false")).to("mock:false")
}

In this example the splitting is done with a function which takes the message body and returns an Iterator.

Aggregation

An aggregation is a way to combine several messages in a new message. An aggregation is declared with :

  • How do you combine the messages?
  • How do you group the messages?
  • When the aggregation is complete?
    • When a number of message is aggregated? CompletionSize
    • After a period of time? (CompletionInterval)
    • Or a combination?

The DSL contains some default implementations we will show :

  • Reduce combines messages with the same type and creates a new message with the same type.
  • Fold takes a seed and combines the message with this seed and creates a new message with the type of the seed.
  • CamelAggregation and CamelReferenceAggregation (from the io.xtech.babel.camel.model package) defines an aggregation using camel specific vocabulary.

Reduce

// inputs (1,2,3,4,5,6,7,8,9) -> outputs (6,15,24)
val reduceBody = ReduceBody(
  //defines how message bodies are aggregated
  reduce = (a: Int, b: Int) => a + b,
  //defines when message may be aggregated
  groupBy = (msg: Message[Int]) => "a",
  //defines the size of the aggregation (3 messages)
  completionStrategies = List(CompletionSize(3)))

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:babel").as[Int].
    aggregate(reduceBody).
    to("mock:output")
}

Fold

// inputs (1,2,3,4,5,6,7,8,9) -> outputs ("123","456","789")
val foldBody = FoldBody("",
  //defines how message bodies are aggregated
  (a: String, b: Int) => a + b,
  //defines when message may be aggregated
  (msg: Message[Int]) => "a",
  //defines the size of the aggregation (3 messages)
  completionStrategies = List(CompletionSize(3)))

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:babel").as[Int].
    aggregate(foldBody).
    to("mock:output")
}

Camel Aggregation

import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy
val camelAggr = CamelAggregation(MessageExpression((msg: Message[String]) => "1"),
  aggregationStrategy = new GroupedExchangeAggregationStrategy,
  completionStrategies = List(CompletionSize(3)))

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:input").as[String].
    //aggregates strings based on the camelAggr defined higher
    aggregate(camelAggr).
    //sends the aggregated string to the mock endpoint
    to("mock:output")
}
//defines when message should be aggregated
val camelExp = new MessageExpression((a: Message[String]) => "1")

import io.xtech.babel.camel.model.Aggregation.CamelReferenceAggregation
val camelAggr = CamelReferenceAggregation[String, String](
  correlationExpression = camelExp,
  //defines the string id of the aggregation strategy in the bean registry
  "aggregationStrategy",
  completionStrategies = List(CompletionSize(3)))

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:input").as[String].
    //aggregates strings based on the camelAggr defined higher
    aggregate(camelAggr).
    //sends the aggregated string to the mock endpoint
    to("mock:output")
}

Wire-Tap

The wiretap keyword is the way to route messages to another location while they keep beeing process by the regular flow.

from("direct:input-babel").
  //Incoming messages are sent to the direct endpoint
  //   and to the next mock endpoint
  wiretap("direct:babel-tap")
  .to("mock:output-babel")

Validate

The validate keyword validates messages passing through a route using a function or a Camel predicate.

A message will be valid only if the expression or function is returning true. Otherwise, an exception is thrown.

Camel Predicate

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validate(Builder.body().isEqualTo(1)).
    to("mock:output")
}

Message Function

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validate(msg => msg.body == Some(1)).
    to("mock:output")
}

Body Function

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validateBody(body => body == 1).
    to("mock:output")
}