I was preparing to attend the CamelOne conference and since it had been a while since I've used Apache Camel, I thought I would take it for a spin. For those of you who don't know Apache Camel, is the premier open source integration framework and it implements all of the Enterprise Integration patterns. For a good overall introduction I would recommend the following post. I also, would highly recommend the Camel In Action book as well.

This desire to utilize happened to coincide with a problem I was attempting to solve at my current client. At my client, we have what I will call is poor man's performance monitoring via log4j based aspects. These aspects start and stop a timer on each annotated method invocation and write a record to our log files. We were running our performance testing and I wanted to parse the log file and write the records to a database table so that we could analyze the results.

Apache Camel supports Java, Spring XML, and Scala DSL's, among others. As a recent convert to Scala, I chose the Scala DSL. For the process I was trying to model needed to accomplish the following:

  1. Read a file from the file system.
  2. Parse each line of the file into the individual pieces and parts that are to be stored into the database.
  3. Load the resulting lines into the table.

In order to accomplish, this I used the following components:

  1. File processor with a Splitter
  2. Custom processor to parse each line.
  3. JDBC Processor

In order to implement a custom processor, you need need to implement Processor interface. Because, I'm using Scala and I'm just constructing an inline app, the function definition is as follows:

 val processor = (exchange: Exchange) => {
 // Get the incoming line from the message exchange
 val line = exchange.getIn.getBody(classOf[String])
 // Remove the first part of the log message
 val lineSplit = line.split("\\[\\]\\: ")
 // Break it down into the individual components
 val split = lineSplit(1).split(" ")
 // Build the insert statement
 val x = MessageFormat.format(format, split(0), split(1), split(2), split(3), split(4), split(5), split(7), split(10))
 // Replace the incoming message with the insert statement

Next up, I need to establish the Database connection so that the JDBC component can insert. I used Apache Commons DBCP and setup the connection and registered it with Camel:

 // Build the connection pool
 val ds = new BasicDataSource
 // Register it with Camel
 val reg = new SimpleRegistry 
 reg.put("dataSource", ds)

The next part of the exercise is to setup the camel route builder:

 // Create the Camel Context
 val context = new DefaultCamelContext(reg)
 // Build the Route
 context.addRoutes(new RouteBuilder {
 // Read the file(s) from the directory
 "file:perf" ==> {
 // Split the file up at each line
 split(_.getIn().getBody(classOf[String]).split("\n")) {
 // The further split up the processing across multiple parsers
 loadbalance roundrobin {
 // Reference to my internal components
 // Join back the results from aboce
 // then split it out again turning off commit level changes
 }.loadbalance roundrobin {
 // Setup my internal processors with references to my custom component
 "direct:x" process (processor)
 "direct:y" process (processor)
 "direct:z" process (processor)

The last part of the exercise is to execute this from my main program:

 // Start the camel process
 // Wait for the process to execute
 // Stop the camel process

I was impressed with a little bit of code [less than 70 lines], I was able to process a 50MB in under 10 seconds. One note, the Scala builder does not support all of the features of the Java DSL. In particular, the splitter does not delete files after processing them , therefore requiring me to process 1 file at a time.

My next exercise is to use the SFTP component to automatically fetch the files for me.