A transformation is a way to modify the message.
A marshaller is a way to change the format of the message. For example : XML <-> JSON
With the marshal and unmarshal keyword, you can choose the direction of the marshalling.
The keywords accepts an instance of a Camel DataFormat object or a reference to an object in a registry (ex : Spring Context).
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").
//Message body type is transformed using a bean id defined lower
marshal("csvMarshaller").
to("mock:output")
}
val registry = new SimpleRegistry
//csvDataFormat is a org.apache.camel.spi.DataFormat instance
registry.put("csvMarshaller", csvDataFormat)
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").
//csvDataFormat is a org.apache.camel.spi.DataFormat instance
marshal(csvDataFormat).
to("mock:output")
}
The sort keyword give a way to sort a part of a message. It accepts two parameters:
The result of the processing of this keyword will be of type java.util.List
val routeBuilder = new RouteBuilder {
//the input string is "4,3,1,2"
from("direct:input").as[String].
//the message body is split and then its output is sorted
sort(msg => msg.body.getOrElse("").split(",")).
//the output is List("1", "2", "3", "4")
to("mock:output")
}
You may also provide an Ordering to the sort:
import java.util.{ List => JList }
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.builder.Builder
val routeBuilder = new RouteBuilder {
//The sort keyword expects Java list type
from("direct:input").as[JList[Int]].
//the exchanges are sorted based on their body
sort(Builder.body(), EvenOddOrdering).
to("mock:output")
}
import scala.math.Ordering.IntOrdering
//The EvenOddOrdering would order the Integer depending on
// if they are even or odd
//1,2,3,4 becomes 2,4,1,3
object EvenOddOrdering extends IntOrdering {
override def compare(a: Int, b: Int): Int = (a, b) match {
case (a, b) if (a % 2) == (b % 2) =>
Ordering.Int.compare(a, b)
case (a, b) if a % 2 == 0 =>
-1
case (a, b) =>
1
}
}
The resequence keyword is useful for sorting messages coming out of order. There are two algorithms :
Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.BatchResequencerConfig
//the resequencing would be done in a batch manner
val batchConfiguration = new BatchResequencerConfig()
val routeBuilder = new RouteBuilder {
//message bodies are converted to Integer if required
from("direct:input").as[Int].
//resequencing is based on the body of the message
resequence(m => m.body.getOrElse(0), batchConfiguration).
//sends received Integer in a resquenced sequence to the mock endpoint
to("mock:output")
}
Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.StreamResequencerConfig
//the resequencing would be done in a streaming manner
val streamConfiguration = new StreamResequencerConfig()
val routeBuilder = new RouteBuilder {
//message bodies are converted to Long if required
from("direct:input").as[Long].
//resequencing is based on the body of the message
resequence(m => m.body.getOrElse(0), streamConfiguration).
//sends the received Long in a resquenced sequence to the mock endpoint
to("mock:output")
}
The enrich and pollEnrich keywords retrieve additional data from an endpoint and let you combine the original and the new message with an aggregator.
The enrich is using a request-reply pattern with a endpoint (ex: Web Service) to obtain more data.
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrich("seda:enrichRoute", (a, b: Any) => s"$a${b.toString}", 1000).
to("mock:output")
}
The pollEnrich is using a Polling Consumer from an endpoint (ex: JMS, SEDA, File) to obtain more data.
val routeDef = new RouteBuilder with Mock {
from("direct:enricherRoute").mock("enricher")
from("direct:input").
requireAs[String].
//enriches the input with the enricherRoute messages
// using the aggregationStrategy
enrich("direct:enricherRoute", (a: String, b: Any) => s"${a}${b.toString.toInt}").
mock("output")
}
routeDef.addRoutesToCamelContext(camelContext)
camelContext.start()
val mockEndpoint = camelContext.mockEndpoint("output")
val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))
mockEndpoint.expectedBodiesReceived("bla123", "bli123")
val producer = camelContext.createProducerTemplate()
producer.sendBody("direct:input", "bla")
producer.sendBody("direct:input", "bli")
mockEndpoint.assertIsSatisfied()
}
"do not work with a FoldBodyAggregationStrategy" in new camel {
import io.xtech.babel.camel.builder.RouteBuilder
case class Result(string: String) {
def fold(next: String): Result = Result(string + next)
}
val aggregationStrategy = new FoldBodyAggregationStrategy[String, Result](Result(""), (a, b) => a.fold(b))
val routeDef = new RouteBuilder {
from("direct:enricherRoute").to("mock:enricher")
from("direct:input").
enrich("direct:enricherRoute", aggregationStrategy).
to("mock:output")
}
routeDef.addRoutesToCamelContext(camelContext)
camelContext.start()
val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))
val producer = camelContext.createProducerTemplate()
producer.sendBody("direct:input", "bla") should throwA[CamelExecutionException]
}
"enrich a message with the pollEnrich keyword and a reference to an aggregationStrategy" in new camel {
import io.xtech.babel.camel.builder.RouteBuilder
// pending
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrichRef("seda:enrichRoute", "aggregationStrategy", 1000).
to("mock:output")
}
val registry = new SimpleRegistry
registry.put("aggregationStrategy",
new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
routeDef.addRoutesToCamelContext(camelContext)
camelContext.start()
val mockEndpoint = camelContext.mockEndpoint("output")
val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))
mockEndpoint.expectedBodiesReceived("bla123")
val producer = camelContext.createProducerTemplate()
producer.sendBody("seda:enrichRoute", "123")
producer.sendBody("direct:input", "bla")
mockEndpoint.assertIsSatisfied()
}
"enrich a message with the pollEnrich keyword and an instance of an aggregationStrategy" in new camel {
import io.xtech.babel.camel.builder.RouteBuilder
//pending
val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrichAggregation("seda:enrichRoute", aggregationStrategy, 1000).
to("mock:output")
}
routeDef.addRoutesToCamelContext(camelContext)
camelContext.start()
val mockEndpoint = camelContext.mockEndpoint("output")
val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))
mockEndpoint.expectedBodiesReceived("bla123")
val producer = camelContext.createProducerTemplate()
producer.sendBody("seda:enrichRoute", "123")
producer.sendBody("direct:input", "bla")
mockEndpoint.assertIsSatisfied()
}
"enrich a message with the pollEnrich keyword and a aggregationFunction" in new camel {
import io.xtech.babel.camel.builder.RouteBuilder
//pending
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrich("seda:enrichRoute", (a, b: Any) => s"$a${b.toString}", 1000).
to("mock:output")
}
routeDef.addRoutesToCamelContext(camelContext)
camelContext.start()
val mockEndpoint = camelContext.mockEndpoint("output")
val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))
mockEndpoint.expectedBodiesReceived("bla123")
val producer = camelContext.createProducerTemplate()
producer.sendBody("seda:enrichRoute", "123")
producer.sendBody("direct:input", "bla")
mockEndpoint.assertIsSatisfied()
}
}
}
You may also use Camel or Babel AggregationStrategy to define how the incomming message and the enriched one are merged:
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrichRef("seda:enrichRoute", "aggregationStrategy", 1000).
to("mock:output")
}
val registry = new SimpleRegistry
registry.put("aggregationStrategy",
new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrichAggregation("seda:enrichRoute", aggregationStrategy, 1000).
to("mock:output")
}
val routeDef = new RouteBuilder {
from("direct:enricherRoute").to("mock:enricher")
from("direct:input").
//enriches the input with the enricherRoute messages
// using the aggregationStrategy
enrichRef("direct:enricherRoute", "aggregationStrategy").
to("mock:output")
}
val registry = new SimpleRegistry
registry.put("aggregationStrategy",
//the used aggregation strategy is stored in a registry
new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)
val routeDef = new RouteBuilder {
from("direct:enricherRoute").to("mock:enricher")
from("direct:input").
//enriches the input with the enricherRoute messages
// using the aggregationStrategy
enrich("direct:enricherRoute", aggregationStrategy).
to("mock:output")
}
Warning
It’s not recommended to us the enrich and pollEnrich keywords with the io.xtech.babel.camel.model.FoldBodyAggregationStrategy. The only supported Aggregation strategy are io.xtech.babel.camel.model.ReduceBodyAggregationStrategy and custom implementations of the org.apache.camel.processor.aggregate.AggregationStrategy Interface.
You can transform a message including your own business logic. Such data transformation may be defined either by a function or using a bean. The functional way is always preferred in the Babel philosophy.
You can transform a message with a function.
The processBody keyword works on message bodies.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//processBody concatenates received string with "bli"
processBody(JavaProcessors.append(_)).
//sends the concatenated string to the mock endpoint
to("mock:output")
}
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//processBody concatenates received string with "bli"
processBody(string => string + "bli").
//sends the concatenated string to the mock endpoint
to("mock:output")
}
The process keyword works on messages.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//process redefines a new Message with Body
process(msg => msg.withBody(_ + "bli")).
//sends the concatenated string to the mock endpoints
to("mock:output")
}
Note
Babel provides a minimal API to modify Message
or Body you are dealing with in your transformation
Message
and let you define how its Body is transformed (using a function)//append a string to the message's body
val newMsg = message.withBody(_ + "bli")
Message
and let you add a new Header (using a two arguments: key and value)val newMsg = message.withHeader("c", 42)
Message
and let you define how its Headers is transformed (using a function)//add headers to the message
val newMsg = message.withHeaders(headers => headers ++ Map("c" -> 42, "d" -> "dd"))
//replace headers by only one
val newMsg = message.withHeaders(headers => Map("c" -> 42))
It also provides methods concerning the Exchange properties, exceptions and MessageExchangePattern.
You can transform a message with a bean (using camel way to handle beans)
Warning
This keyword will remove type safety for the rest of your route, thus it has been deprecated and might disappear if no user does require it.
With a reference in Camel registry (or in Spring Context):
val routeDef = new RouteBuilder {
from("direct:input").
//bean keyword is deprecated!
bean("myBean").to(mockProducer)
}
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to the "doIt" method
// of the class with bean id "myBean"
//bean keyword is deprecated!
bean("myBean", "doIt").
//the bean keyword destroys the type of the next keyword
to(mockProducer)
}
With an instance:
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to
// the TestBean class method which corresponds
//bean keyword is deprecated!
bean(new TestBean).
to(mockProducer)
}
With a class:
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to
// the TestBean class method which corresponds
//bean keyword is deprecated!
bean(classOf[TestBean]).
to(mockProducer)
}