A transformation is a way to modify the message.
A marshaller is a way to change the format of the message. For example : XML <-> JSON
With the marshal and unmarshal keyword, you can choose the direction of the marshalling.
The keywords accepts an instance of a Camel DataFormat object or a reference to an object in a registry (ex : Spring Context).
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").
//Message body type is transformed using a bean id defined lower
marshal("csvMarshaller").
to("mock:output")
}
val registry = new SimpleRegistry
//csvDataFormat is a org.apache.camel.spi.DataFormat instance
registry.put("csvMarshaller", csvDataFormat)
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
from("direct:input").
//csvDataFormat is a org.apache.camel.spi.DataFormat instance
marshal(csvDataFormat).
to("mock:output")
}
The sort keyword give a way to sort a part of a message. It accepts two parameters:
The result of the processing of this keyword will be of type java.util.List
val routeBuilder = new RouteBuilder {
//the input string is "4,3,1,2"
from("direct:input").as[String].
//the message body is split and then its output is sorted
sort(msg => msg.body.getOrElse("").split(",")).
//the output is List("1", "2", "3", "4")
to("mock:output")
}
You may also provide an Ordering to the sort:
import java.util.{ List => JList }
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.builder.Builder
val routeBuilder = new RouteBuilder {
//The sort keyword expects Java list type
from("direct:input").as[JList[Int]].
//the exchanges are sorted based on their body
sort(Builder.body(), EvenOddOrdering).
to("mock:output")
}
import scala.math.Ordering.IntOrdering
//The EvenOddOrdering would order the Integer depending on
// if they are even or odd
//1,2,3,4 becomes 2,4,1,3
object EvenOddOrdering extends IntOrdering {
override def compare(a: Int, b: Int): Int = if ((a % 2) == (b % 2)) {
Ordering.Int.compare(a, b)
}
else if (a % 2 == 0) {
-1
}
else {
1
}
}
The resequence keyword is useful for sorting messages coming out of order. There are two algorithms :
Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.BatchResequencerConfig
//the resequencing would be done in a batch manner
val batchConfiguration = new BatchResequencerConfig()
val routeBuilder = new RouteBuilder {
//message bodies are converted to Integer if required
from("direct:input").as[Int].
//resequencing is based on the body of the message
resequence(m => m.body.getOrElse(0), batchConfiguration).
//sends received Integer in a resquenced sequence to the mock endpoint
to("mock:output")
}
Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.
import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.StreamResequencerConfig
//the resequencing would be done in a streaming manner
val streamConfiguration = new StreamResequencerConfig()
val routeBuilder = new RouteBuilder {
//message bodies are converted to Long if required
from("direct:input").as[Long].
//resequencing is based on the body of the message
resequence(m => m.body.getOrElse(0), streamConfiguration).
//sends the received Long in a resquenced sequence to the mock endpoint
to("mock:output")
}
The enrich and pollEnrich retrieves additional data from an endpoint and let you combine the original and new message with an aggregator. The enrich is using a request-reply pattern with the endpoint.
val routeDef = new RouteBuilder {
from("direct:enricherRoute").to("mock:enricher")
from("direct:input").
//enriches the input with the enricherRoute messages
// using the aggregationStrategy
enrich("direct:enricherRoute", "aggregationStrategy").
to("mock:output")
}
val registry = new SimpleRegistry
registry.put("aggregationStrategy",
//the used aggregation strategy is stored in a registry
new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
The pollEnrich is more polling the endpoint with a timeout if no message is received after a while.
val routeDef = new RouteBuilder {
from("direct:input").
pollEnrich("seda:enrichRoute", "aggregationStrategy", 1000).
to("mock:output")
}
val registry = new SimpleRegistry
registry.put("aggregationStrategy",
new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
You can transform a message including your own business logic. Such data transformation may be defined either by a function or using a bean. The functional way is always preferred in the Babel philosophy.
You can transform a message with a function.
The processBody keyword works on message bodies.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//processBody concatenates received string with "bli"
processBody(string => string + "bli").
//sends the concatenated string to the mock endpoint
to("mock:output")
}
The process keyword works on messages.
import io.xtech.babel.camel.builder.RouteBuilder
val routeDef = new RouteBuilder {
//message bodies are converted to String if required
from("direct:input").as[String].
//process redefines a new Message with Body
process(msg => msg.withBody(_ + "bli")).
//sends the concatenated string to the mock endpoints
to("mock:output")
}
You can transform a message with a bean (using camel way to handle beans)
Warning
This keyword will remove type safety for the rest of your route, thus it has been deprecated and might disappear if no user does require it.
With a reference in Camel registry (or in Spring Context):
val routeDef = new RouteBuilder {
from("direct:input").
//bean keyword is deprecated!
bean("myBean").to("mock:output")
}
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to the "doIt" method
// of the class with bean id "myBean"
//bean keyword is deprecated!
bean("myBean", "doIt").
//the bean keyword destroys the type of the next keyword
to("mock:output")
}
With an instance:
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to
// the TestBean class method which corresponds
//bean keyword is deprecated!
bean(new TestBean).
to("mock:output")
}
With a class:
val routeDef = new RouteBuilder {
from("direct:input").
//the received message are provided to
// the TestBean class method which corresponds
//bean keyword is deprecated!
bean(classOf[TestBean]).
to("mock:output")
}