Babel Camel Transformations

A transformation is a way to modify the message.

Marshalling

A marshaller is a way to change the format of the message. For example : XML <-> JSON

With the marshal and unmarshal keyword, you can choose the direction of the marshalling.

The keywords accepts an instance of a Camel DataFormat object or a reference to an object in a registry (ex : Spring Context).

With a Dataformat

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:input").
    //Message body type is transformed using a bean id defined lower
    marshal("csvMarshaller").
    to("mock:output")
}

val registry = new SimpleRegistry
//csvDataFormat is a org.apache.camel.spi.DataFormat instance
registry.put("csvMarshaller", csvDataFormat)

With a reference

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  from("direct:input").
    //csvDataFormat is a org.apache.camel.spi.DataFormat instance
    marshal(csvDataFormat).
    to("mock:output")
}

Sorting

The sort keyword give a way to sort a part of a message. It accepts two parameters:

  • a function that split a part of the message.
  • an optional comparator that give how the ordering will be done.

The result of the processing of this keyword will be of type java.util.List

val routeBuilder = new RouteBuilder {
  //the input string is "4,3,1,2"
  from("direct:input").as[String].
    //the message body is split and then its output is sorted
    sort(msg => msg.body.getOrElse("").split(",")).
    //the output is List("1", "2", "3", "4")
    to("mock:output")
}

You may also provide an Ordering to the sort:

import java.util.{ List => JList }

import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.builder.Builder

val routeBuilder = new RouteBuilder {
  //The sort keyword expects Java list type
  from("direct:input").as[JList[Int]].
    //the exchanges are sorted based on their body
    sort(Builder.body(), EvenOddOrdering).
    to("mock:output")
}
import scala.math.Ordering.IntOrdering

//The EvenOddOrdering would order the Integer depending on
//   if they are even or odd
//1,2,3,4 becomes 2,4,1,3
object EvenOddOrdering extends IntOrdering {
  override def compare(a: Int, b: Int): Int = (a, b) match {
    case (a, b) if (a % 2) == (b % 2) =>
      Ordering.Int.compare(a, b)
    case (a, b) if a % 2 == 0 =>
      -1
    case (a, b) =>
      1
  }
}

Resequencer

The resequence keyword is useful for sorting messages coming out of order. There are two algorithms :

Batch resequencing collects messages into a batch, sorts the messages and sends them to their output.

import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.BatchResequencerConfig

//the resequencing would be done in a batch manner
val batchConfiguration = new BatchResequencerConfig()

val routeBuilder = new RouteBuilder {
  //message bodies are converted to Integer if required
  from("direct:input").as[Int].
    //resequencing is based on the body of the message
    resequence(m => m.body.getOrElse(0), batchConfiguration).
    //sends received Integer in a resquenced sequence to the mock endpoint
    to("mock:output")
}

Stream resequencing re-orders (continuous) message streams based on the detection of gaps between messages.

import io.xtech.babel.camel.builder.RouteBuilder
import org.apache.camel.model.config.StreamResequencerConfig

//the resequencing would be done in a streaming manner
val streamConfiguration = new StreamResequencerConfig()

val routeBuilder = new RouteBuilder {
  //message bodies are converted to Long if required
  from("direct:input").as[Long].
    //resequencing is based on the body of the message
    resequence(m => m.body.getOrElse(0), streamConfiguration).
    //sends the received Long in a resquenced sequence to the mock endpoint
    to("mock:output")
}

Enrich

The enrich and pollEnrich keywords retrieve additional data from an endpoint and let you combine the original and the new message with an aggregator.

The enrich is using a request-reply pattern with a endpoint (ex: Web Service) to obtain more data.

val routeDef = new RouteBuilder {
  from("direct:input").
    pollEnrich("seda:enrichRoute", (a, b: Any) => s"$a${b.toString}", 1000).
    to("mock:output")
}

The pollEnrich is using a Polling Consumer from an endpoint (ex: JMS, SEDA, File) to obtain more data.

      val routeDef = new RouteBuilder with Mock {
        from("direct:enricherRoute").mock("enricher")

        from("direct:input").
          requireAs[String].
          //enriches the input with the enricherRoute messages
          //  using the aggregationStrategy
          enrich("direct:enricherRoute", (a: String, b: Any) => s"${a}${b.toString.toInt}").
          mock("output")
      }


      routeDef.addRoutesToCamelContext(camelContext)
      camelContext.start()

      val mockEndpoint = camelContext.mockEndpoint("output")
      val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
      enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))

      mockEndpoint.expectedBodiesReceived("bla123", "bli123")

      val producer = camelContext.createProducerTemplate()
      producer.sendBody("direct:input", "bla")
      producer.sendBody("direct:input", "bli")

      mockEndpoint.assertIsSatisfied()
    }

    "do not work with a FoldBodyAggregationStrategy" in new camel {

      import io.xtech.babel.camel.builder.RouteBuilder

      case class Result(string: String) {
        def fold(next: String): Result = Result(string + next)
      }

      val aggregationStrategy = new FoldBodyAggregationStrategy[String, Result](Result(""), (a, b) => a.fold(b))

      val routeDef = new RouteBuilder {
        from("direct:enricherRoute").to("mock:enricher")

        from("direct:input").
          enrich("direct:enricherRoute", aggregationStrategy).
          to("mock:output")
      }

      routeDef.addRoutesToCamelContext(camelContext)
      camelContext.start()

      val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
      enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))

      val producer = camelContext.createProducerTemplate()
      producer.sendBody("direct:input", "bla") should throwA[CamelExecutionException]

    }

    "enrich a message with the pollEnrich keyword and a reference to an aggregationStrategy" in new camel {

      import io.xtech.babel.camel.builder.RouteBuilder

      // pending


      val routeDef = new RouteBuilder {
        from("direct:input").
          pollEnrichRef("seda:enrichRoute", "aggregationStrategy", 1000).
          to("mock:output")
      }

      val registry = new SimpleRegistry
      registry.put("aggregationStrategy",
        new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
      camelContext.setRegistry(registry)

      routeDef.addRoutesToCamelContext(camelContext)
      camelContext.start()

      val mockEndpoint = camelContext.mockEndpoint("output")
      val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
      enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))

      mockEndpoint.expectedBodiesReceived("bla123")

      val producer = camelContext.createProducerTemplate()
      producer.sendBody("seda:enrichRoute", "123")
      producer.sendBody("direct:input", "bla")

      mockEndpoint.assertIsSatisfied()
    }

    "enrich a message with the pollEnrich keyword and an instance of an aggregationStrategy" in new camel {

      import io.xtech.babel.camel.builder.RouteBuilder

      //pending


      val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)

      val routeDef = new RouteBuilder {
        from("direct:input").
          pollEnrichAggregation("seda:enrichRoute", aggregationStrategy, 1000).
          to("mock:output")
      }

      routeDef.addRoutesToCamelContext(camelContext)
      camelContext.start()

      val mockEndpoint = camelContext.mockEndpoint("output")
      val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
      enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))

      mockEndpoint.expectedBodiesReceived("bla123")

      val producer = camelContext.createProducerTemplate()
      producer.sendBody("seda:enrichRoute", "123")
      producer.sendBody("direct:input", "bla")

      mockEndpoint.assertIsSatisfied()
    }

    "enrich a message with the pollEnrich keyword and a aggregationFunction" in new camel {

      import io.xtech.babel.camel.builder.RouteBuilder

      //pending

      val routeDef = new RouteBuilder {
        from("direct:input").
          pollEnrich("seda:enrichRoute", (a, b: Any) => s"$a${b.toString}", 1000).
          to("mock:output")
      }

      routeDef.addRoutesToCamelContext(camelContext)
      camelContext.start()

      val mockEndpoint = camelContext.mockEndpoint("output")
      val enricherMockEndpoint = camelContext.mockEndpoint("enricher")
      enricherMockEndpoint.returnReplyBody(new SimpleBuilder("123"))

      mockEndpoint.expectedBodiesReceived("bla123")

      val producer = camelContext.createProducerTemplate()
      producer.sendBody("seda:enrichRoute", "123")
      producer.sendBody("direct:input", "bla")

      mockEndpoint.assertIsSatisfied()
    }

  }
}

You may also use Camel or Babel AggregationStrategy to define how the incomming message and the enriched one are merged:

val routeDef = new RouteBuilder {
  from("direct:input").
    pollEnrichRef("seda:enrichRoute", "aggregationStrategy", 1000).
    to("mock:output")
}

val registry = new SimpleRegistry
registry.put("aggregationStrategy",
  new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)

val routeDef = new RouteBuilder {
  from("direct:input").
    pollEnrichAggregation("seda:enrichRoute", aggregationStrategy, 1000).
    to("mock:output")
}
val routeDef = new RouteBuilder {
  from("direct:enricherRoute").to("mock:enricher")

  from("direct:input").
    //enriches the input with the enricherRoute messages
    //  using the aggregationStrategy
    enrichRef("direct:enricherRoute", "aggregationStrategy").
    to("mock:output")
}

val registry = new SimpleRegistry
registry.put("aggregationStrategy",
  //the used aggregation strategy is stored in a registry
  new ReduceBodyAggregationStrategy[String]((a, b) => a + b))
camelContext.setRegistry(registry)
val aggregationStrategy = new ReduceBodyAggregationStrategy[String]((a, b) => a + b)

val routeDef = new RouteBuilder {
  from("direct:enricherRoute").to("mock:enricher")

  from("direct:input").
    //enriches the input with the enricherRoute messages
    //  using the aggregationStrategy
    enrich("direct:enricherRoute", aggregationStrategy).
    to("mock:output")
}

Warning

It’s not recommended to us the enrich and pollEnrich keywords with the io.xtech.babel.camel.model.FoldBodyAggregationStrategy. The only supported Aggregation strategy are io.xtech.babel.camel.model.ReduceBodyAggregationStrategy and custom implementations of the org.apache.camel.processor.aggregate.AggregationStrategy Interface.

Processors

You can transform a message including your own business logic. Such data transformation may be defined either by a function or using a bean. The functional way is always preferred in the Babel philosophy.

With a function

You can transform a message with a function.

The processBody keyword works on message bodies.

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:input").as[String].
    //processBody concatenates received string with "bli"
    processBody(JavaProcessors.append(_)).
    //sends the concatenated string to the mock endpoint
    to("mock:output")
}

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:input").as[String].
    //processBody concatenates received string with "bli"
    processBody(string => string + "bli").
    //sends the concatenated string to the mock endpoint
    to("mock:output")
}

The process keyword works on messages.

import io.xtech.babel.camel.builder.RouteBuilder

val routeDef = new RouteBuilder {
  //message bodies are converted to String if required
  from("direct:input").as[String].
    //process redefines a new Message with Body
    process(msg => msg.withBody(_ + "bli")).
    //sends the concatenated string to the mock endpoints
    to("mock:output")
}

Note

Babel provides a minimal API to modify Message or Body you are dealing with in your transformation

  • withBody creates a copy of the current Message and let you define how its Body is transformed (using a function)
//append a string to the message's body
val newMsg = message.withBody(_ + "bli")
  • withHeader creates a copy of the current Message and let you add a new Header (using a two arguments: key and value)
val newMsg = message.withHeader("c", 42)
  • withHeaders creates a copy of the current Message and let you define how its Headers is transformed (using a function)
//add headers to the message
val newMsg = message.withHeaders(headers => headers ++ Map("c" -> 42, "d" -> "dd"))
//replace headers by only one
val newMsg = message.withHeaders(headers => Map("c" -> 42))
  • exchange to access directly to the wrapped Camel Exchange.

It also provides methods concerning the Exchange properties, exceptions and MessageExchangePattern.

With a Bean

You can transform a message with a bean (using camel way to handle beans)

Warning

This keyword will remove type safety for the rest of your route, thus it has been deprecated and might disappear if no user does require it.

With a reference in Camel registry (or in Spring Context):

val routeDef = new RouteBuilder {
  from("direct:input").
    //bean keyword is deprecated!
    bean("myBean").to(mockProducer)
}
val routeDef = new RouteBuilder {
  from("direct:input").
    //the received message are provided to the "doIt" method
    //   of the class with bean id "myBean"
    //bean keyword is deprecated!
    bean("myBean", "doIt").
    //the bean keyword destroys the type of the next keyword
    to(mockProducer)
}

With an instance:

val routeDef = new RouteBuilder {
  from("direct:input").
    //the received message are provided to
    //   the TestBean class method which corresponds
    //bean keyword is deprecated!
    bean(new TestBean).
    to(mockProducer)
}

With a class:

val routeDef = new RouteBuilder {
  from("direct:input").
    //the received message are provided to
    //   the TestBean class method which corresponds
    //bean keyword is deprecated!
    bean(classOf[TestBean]).
    to(mockProducer)
}