The Routing is used to define where the messages should be routed, or not.
The multicast keywords defines a static list of outputs where the message is sent.
val routeDef = new RouteBuilder {
from("direct:input").as[String].
//received messages are sent to those three mock endpoints
multicast("mock:output1", "mock:output2", "mock:output3").
to("mock:output4")
}
The recipientList is like the multicast keyword, but the list can be dynamic and calculated at runtime.
val routeDef = new RouteBuilder {
from("direct:input").as[String].
//received messages targets are dynamically defined by the headers of each message
recipientList(m => m.headers("recipients")).
to("mock:output4")
}
The filter and filterBody keywords filter message with a predicate.
In this example, the predicate is a function taking a message and returning a boolean.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//direct:input receives one message with
// "1,2,3,true,4,5,6,7"
from("direct:input").as[String]
//the splitBody creates 8 messages with string body
// by a function which returns a string iterator
.splitBody(body => body.split(",").iterator)
//the filter will only let continue the one whose body is "true"
.filter(msg => msg.body.exists(body => body == "true")).
to("mock:output")
}
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//direct:input receives one message with
// "1,2,3,true,4,5,6,7"
from("direct:input").as[String]
//the splitBody creates 8 messages with string body
// by a function which returns a string iterator
.splitBody(body => body.split(",").iterator)
//the filter will only let continue the one whose body is "true"
.filterBody(body => body == "true").
to("mock:output")
}
The choice keyword gives you a way to choose where you are sending the message.
You configures a choice with when, whenBody and otherwise keywords. Each when accepts a predicate. In this example the predicates are function taking message and returning a boolean.
val routeDef = new RouteBuilder {
from("direct:babel").as[String].choice {
c =>
c.when(msg => msg.body == Some("1")).
processBody(body => body + "done").to("mock:output1")
c.when(msg => msg.body == Some("2")).
processBody(body => body + "done").to("mock:output2")
c.when(msg => msg.body == Some("3")).
processBody(body => body + "done").to("mock:output3")
c.otherwise.
processBody(body => body + "done").
to("mock:output4")
}
.to("mock:output5")
The split keyword is the way to split a message in pieces, the splitBody does the same directly on the message body.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").as[String]
//split the message base on its body using the comma
.splitBody(_.split(",").iterator)
//which creates several messages
.filter(msg => msg.body.exists(body => body.contains("true")))
//required as keyword to recover the type
.to("mock:output").as[String]
//split the message base on its body using the spaces
.splitBody(_.split(" ").iterator)
.filter(msg => msg.body.exists(body => body == "false")).to("mock:false")
}
In this example the splitting is done with a function which takes the message body and returns an Iterator.
An aggregation is a way to combine several messages in a new message. An aggregation is declared with :
The DSL contains some default implementations we will show :
// inputs (1,2,3,4,5,6,7,8,9) -> outputs (6,15,24)
val reduceBody = ReduceBody(
//defines how message bodies are aggregated
reduce = (a: Int, b: Int) => a + b,
//defines when message may be aggregated
groupBy = (msg: Message[Int]) => "a",
//defines the size of the aggregation (3 messages)
completionStrategies = List(CompletionSize(3)))
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:babel").as[Int].
aggregate(reduceBody).
to("mock:output")
}
// inputs (1,2,3,4,5,6,7,8,9) -> outputs ("123","456","789")
val foldBody = FoldBody("",
//defines how message bodies are aggregated
(a: String, b: Int) => a + b,
//defines when message may be aggregated
(msg: Message[Int]) => "a",
//defines the size of the aggregation (3 messages)
completionStrategies = List(CompletionSize(3)))
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:babel").as[Int].
aggregate(foldBody).
to("mock:output")
}
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy
val camelAggr = CamelAggregation(MessageExpression((msg: Message[String]) => "1"),
aggregationStrategy = new GroupedExchangeAggregationStrategy,
completionStrategies = List(CompletionSize(3)))
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//aggregates strings based on the camelAggr defined higher
aggregate(camelAggr).
//sends the aggregated string to the mock endpoint
to("mock:output")
}
//defines when message should be aggregated
val camelExp = new MessageExpression((a: Message[String]) => "1")
import io.xtech.babel.camel.model.Aggregation.CamelReferenceAggregation
val camelAggr = CamelReferenceAggregation[String, String](
correlationExpression = camelExp,
//defines the string id of the aggregation strategy in the bean registry
"aggregationStrategy",
completionStrategies = List(CompletionSize(3)))
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//aggregates strings based on the camelAggr defined higher
aggregate(camelAggr).
//sends the aggregated string to the mock endpoint
to("mock:output")
}
The wiretap keyword is the way to route messages to another location while they keep beeing process by the regular flow.
from("direct:input-babel").
//Incoming messages are sent to the direct endpoint
// and to the next mock endpoint
wiretap("direct:babel-tap")
.to("mock:output-babel")
The validate keyword validates messages passing through a route using a function or a Camel predicate.
A message will be valid only if the expression or function is returning true. Otherwise, an exception is thrown.
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validate(Builder.body().isEqualTo(1)).
to("mock:output")
}
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validate(msg => msg.body == Some(1)).
to("mock:output")
}
val routeBuilder = new RouteBuilder {
from("direct:input").as[Int].
validateBody(body => body == 1).
to("mock:output")
}