The Routing is used to define where the messages should be routed, or not.
The multicast keywords defines a static list of outputs where the message is sent.
val routeDef = new RouteBuilder {
from("direct:input").as[String].
//received messages are sent to those three mock endpoints
multicast("mock:output1", "mock:output2", "mock:output3").
to("mock:output4")
}
The recipientList is like the multicast keyword, but the list can be dynamic and calculated at runtime.
val routeDef = new RouteBuilder {
from("direct:input").as[String].
//received messages targets are dynamically defined by the headers of each message
recipientList(m => m.headers("recipients")).
to("mock:output4")
}
The filter and filterBody keywords filter message with a predicate.
In this example, the predicate is a function taking a message and returning a boolean.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//direct:input receives one message with
// "1,2,3,true,4,5,6,7"
from("direct:input").as[String]
//the splitBody creates 8 messages with string body
// by a function which returns a string iterator
.splitBody(body => body.split(",").iterator)
//the filter will only let continue the one whose body is "true"
.filter(msg => msg.body.exists(body => body == "true")).
to("mock:output")
}
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//direct:input receives one message with
// "1,2,3,true,4,5,6,7"
from("direct:input").as[String]
//the splitBody creates 8 messages with string body
// by a function which returns a string iterator
.splitBody(body => body.split(",").iterator)
//the filter will only let continue the one whose body is "true"
.filterBody(body => body == "true").
to("mock:output1").
to("mock:output2")
}
Note
Contrary to Apache Camel, Babel does not provide the end keyword. After the filter (or filterBody) keyword, only accepted message are processed by next EIPs. If you want to process also the other message, you may dispatch your message to another part of route, with a multicast or a choice for example.
The choice keyword gives you a way to choose where you are sending the message.
You configures a choice with when, whenBody and otherwise keywords. Each when accepts a predicate. In this example the predicates are function taking message and returning a boolean.
val routeDef = new RouteBuilder {
from("direct:babel").as[String].choice {
c =>
c.when(msg => msg.body == Some("1")).
processBody(body => body + "done").to("mock:output1")
c.when(msg => msg.body == Some("2")).
processBody(body => body + "done").to("mock:output2")
c.when(msg => msg.body == Some("3")).
processBody(body => body + "done").to("mock:output3")
c.otherwise.
processBody(body => body + "done").
to("mock:output4")
}
.to("mock:output5")
The split keyword is the way to split a message in pieces, the splitBody does the same directly on the message body.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").as[String]
//split the message base on its body using the comma
.splitBody(_.split(",").iterator)
//which creates several messages
.filter(msg => msg.body.exists(body => body.contains("true")))
//required as keyword to recover the type
.to("mock:output").as[String]
//split the message base on its body using the spaces
.splitBody(_.split(" ").iterator)
.filter(msg => msg.body.exists(body => body == "false")).to("mock:false")
}
In this example the splitting is done with a function which takes the message body and returns an Iterator.
The splitReduceBody and the splitFoldBody define higher-level EIPs: The possibility to split a content, apply a modification on each part of the content and then merge those parts into a new content. The two keywords differs in their way to merge the final content. They are inspired by the AggregationStrategy
that are defined in the Aggregation
part.
The splitReduceBody let you define a simple aggregation which does not change the type during the aggregation:
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:babel").as[String].splitReduceBody(_.split(",").iterator) {
_.to("mock:babel1").requireAs[String].processBody(_.toInt + 1 + "").to("mock:babel2").requireAs[String]
}((x, y) => s"$x,$y").
processBody(x => x).
to("mock:babel3")
}
The splitFoldBody let you define a more complexe aggregation which does change the type during the aggregation:
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:babel").as[String].splitFoldBody(_.split(",").iterator) {
_.to("mock:babel1").requireAs[String].processBody(_.toInt + 1).to("mock:babel2").requireAs[Int]
}("1")((x, y) => s"$x,$y").
processBody(x => x).
to("mock:babel3")
}
An aggregation is a way to combine several messages in a new message. An aggregation is declared with :
The DSL contains some default implementations we will show :
// inputs (1,2,3,4,5,6,7,8,9) -> outputs (6,15,24)
val reduceBody = ReduceBody(
//defines how message bodies are aggregated
reduce = (a: Int, b: Int) => a + b,
//defines when message may be aggregated
groupBy = (msg: Message[Int]) => "a",
//defines the size of the aggregation (3 messages)
completionStrategies = List(CompletionSize(3)))
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:babel").as[Int].
aggregate(reduceBody).
to("mock:output")
}
// inputs (1,2,3,4,5,6,7,8,9) -> outputs ("123","456","789")
val foldBody = FoldBody("",
//defines how message bodies are aggregated
(a: String, b: Int) => a + b,
//defines when message may be aggregated
(msg: Message[Int]) => "a",
//defines the size of the aggregation (3 messages)
completionStrategies = List(CompletionSize(3)))
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:babel").as[Int].
aggregate(foldBody).
to("mock:output")
}
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy
val camelAggr = CamelAggregation(MessageExpression((msg: Message[String]) => "1"),
aggregationStrategy = new GroupedExchangeAggregationStrategy,
completionStrategies = List(CompletionSize(3)))
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//aggregates strings based on the camelAggr defined higher
aggregate(camelAggr).
//sends the aggregated string to the mock endpoint
to("mock:output")
}
//defines when message should be aggregated
val camelExp = new MessageExpression((a: Message[String]) => "1")
import io.xtech.babel.camel.model.Aggregation.CamelReferenceAggregation
val camelAggr = CamelReferenceAggregation[String, String](
correlationExpression = camelExp,
//defines the string id of the aggregation strategy in the bean registry
"aggregationStrategy",
completionStrategies = List(CompletionSize(3)))
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//aggregates strings based on the camelAggr defined higher
aggregate(camelAggr).
//sends the aggregated string to the mock endpoint
to("mock:output")
}
The wiretap keyword is the way to route messages to another location while they keep beeing process by the regular flow.
from("direct:input-babel").
//Incoming messages are sent to the direct endpoint
// and to the next mock endpoint
wiretap("direct:babel-tap")
.to("mock:output-babel")
The validate keyword validates messages passing through a route using a function or a Camel predicate.
A message will be valid only if the expression or function is returning true. Otherwise, an exception is thrown.
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validate(Builder.body().isEqualTo(1)).
to("mock:output")
}
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validate(msg => msg.body == Some(1)).
to("mock:output")
}
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validateBody(body => body == 1).
to("mock:output")
}