Babel Camel Routing

The Routing is used to define where the messages should be routed, or not.

Multicast

The multicast keywords defines a static list of outputs where the message is sent.

val routeDef = new RouteBuilder {
  from("direct:input").as[String].
    //received messages are sent to those three mock endpoints
    multicast("mock:output1", "mock:output2", "mock:output3").
    processBody(_ => "tested").
    to("mock:output4").
    processBody(_ + " by babel").
    to("mock:output5")

}

You may configure the multicast with an aggregation strategy to define how to merge messages that are output by the multicast’s branches.

val aggregation = new ReduceBodyAggregationStrategy[String]((x, y) => x)

val routeDef = new RouteBuilder {
  from("direct:input").as[String].
    //received messages are sent to those three mock endpoints
    multicast("mock:output1", "mock:output2", "mock:output3").withAggregation(aggregation).
    to("mock:output4").
    processBody(_ + " by babel").
    to("mock:output5")

}

Recipient list

The recipientList is like the multicast keyword, but the list can be dynamic and calculated at runtime.

val routeDef = new RouteBuilder {
  from("direct:input").as[String].
    //received messages target is defined by the headers of each message
    recipientList(m => m.headers("recipients")).
    to("mock:output4")
}

Filter

The filter and filterBody keywords filter message with a predicate.

In this example, the predicate is a function taking a message and returning a boolean.

Note

Contrary to Apache Camel, Babel does not provide the end keyword. After the filter (or filterBody) keyword, only accepted message are processed by next EIPs. If you want to process also the other message, you may dispatch your message to another part of route, with a multicast or a choice for example.

Choice

The choice keyword gives you a way to choose where you are sending the message.

You configures a choice with when, whenBody and otherwise keywords. Each when accepts a predicate. In this example the predicates are function taking message and returning a boolean.

val routeDef = new RouteBuilder {
  from("direct:babel").as[String].choice {
    c =>
      c.when(msg => msg.body == Some("1")).
        processBody(body => body + "done").to("mock:output1")
      c.when(msg => msg.body == Some("2")).
        processBody(body => body + "done").to("mock:output2")
      c.when(msg => msg.body == Some("3")).
        processBody(body => body + "done").to("mock:output3")
      c.otherwise.
        processBody(body => body + "done").
        to("mock:output4")
  }
    .to("mock:output5")

Splitter

The split keyword is the way to split a message in pieces, the splitBody does the same directly on the message body.

In this example the splitting is done with a function which takes the message body and returns an Iterator.

The splitReduceBody and the splitFoldBody define higher-level EIPs: The possibility to split a content, apply a modification on each part of the content and then merge those parts into a new content. The two keywords differs in their way to merge the final content. They are inspired by the AggregationStrategy that are defined in the Aggregation part.

The splitReduceBody let you define a simple aggregation which does not change the type during the aggregation:

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:babel").as[String].splitReduceBody(_.split(",").iterator) {
    _.to("mock:babel1").requireAs[String]
      .processBody(_.toInt + 1 + "")
      .to("mock:babel2").requireAs[String]
  }((x, y) => s"$x,$y").
    processBody(x => x).
    to("mock:babel3")
}

The splitFoldBody let you define a more complexe aggregation which does change the type during the aggregation:

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:babel").as[String].splitFoldBody(_.split(",").iterator) {
    _.to("mock:babel1").requireAs[String]
      .processBody(_.toInt + 1)
      .to("mock:babel2").requireAs[Int]
  }("1")((x, y) => s"$x,$y").
    processBody(x => x).
    to("mock:babel3")
}

Additionnal configuration

  • propagateException when an exception is raised in a sub message, the exception is exposed to the initial message.
  • stopOnException when an exception is raised in a sub message, the processing of the initial message is stopped.

Aggregation

An aggregation is a way to combine several messages in a new message. An aggregation is declared with :

  • How do you combine the messages?
  • How do you group the messages?
  • When the aggregation is complete?
    • When a number of message is aggregated? CompletionSize
    • After a period of time? (CompletionInterval)
    • Or a combination?

The DSL contains some default implementations we will show :

  • Reduce combines messages with the same type and creates a new message with the same type.
  • Fold takes a seed and combines the message with this seed and creates a new message with the type of the seed.
  • CamelAggregation and CamelReferenceAggregation (from the io.xtech.babel.camel.model package) defines an aggregation using camel specific vocabulary.

Reduce

// inputs (1,2,3,4,5,6,7,8,9) -> outputs (6,15,24)
val reduceBody = ReduceBody(
  //defines how message bodies are aggregated
  reduce = (a: Int, b: Int) => a + b,
  //defines when message may be aggregated
  groupBy = (msg: Message[Int]) => "a",
  //defines the size of the aggregation (3 messages)
  completionStrategies = List(CompletionSize(3), CompletionTimeout(1000), ForceCompletionOnStop))

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:babel").as[Int].
    aggregate(reduceBody).
    to("mock:output")
}

Fold

// inputs (1,2,3,4,5,6,7,8,9) -> outputs ("123","456","789")
val foldBody = FoldBody("",
  //defines how message bodies are aggregated
  (a: String, b: Int) => a + b,
  //defines when message may be aggregated
  (msg: Message[Int]) => "a",
  //defines the size of the aggregation (3 messages)
  completionStrategies = List(CompletionSize(3), CompletionFromBatchConsumer))

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:babel").as[Int].
    aggregate(foldBody).
    to("mock:output")
}

Camel Aggregation

import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy

val camelAggr = CamelAggregation(MessageExpression((msg: Message[String]) => "1"),
  aggregationStrategy = new GroupedExchangeAggregationStrategy,
  completionStrategies = List(CompletionSize(3), CompletionInterval(1000)))

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from(directConsumer).as[String].
    //aggregates strings based on the camelAggr defined higher
    aggregate(camelAggr).
    //sends the aggregated string to the mock endpoint
    to("mock:output")
}
//defines when message should be aggregated
val camelExp = new MessageExpression((a: Message[String]) => "1")

import io.xtech.babel.camel.model.Aggregation.CamelReferenceAggregation

val camelAggr = CamelReferenceAggregation[String, String](
  correlationExpression = camelExp,
  //defines the string id of the aggregation strategy in the bean registry
  "aggregationStrategy",
  completionStrategies = List(CompletionSize(3)))

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from(directConsumer).as[String].
    //aggregates strings based on the camelAggr defined higher
    aggregate(camelAggr).
    //sends the aggregated string to the mock endpoint
    to("mock:output")
}

Wire-Tap

The wiretap keyword is the way to route messages to another location while they keep beeing process by the regular flow.

from("direct:input-babel").
  //Incoming messages are sent to the direct endpoint
  //   and to the next mock endpoint
  wiretap("direct:babel-tap")
  .to("mock:output-babel")

Validate

The validate keyword validates messages passing through a route using a function or a Camel predicate.

A message will be valid only if the expression or function is returning true. Otherwise, an exception is thrown.

Camel Predicate

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validate(Builder.body().isEqualTo(1)).
    to("mock:output")
}

Message Function

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validate(msg => msg.body == Some(1)).
    to("mock:output")
}

Body Function

val routeBuilder = new RouteBuilder {

  from("direct:input").as[Int].
    validateBody(body => body == 1).
    to("mock:output")
}