My First Camel Ride, Apache Camel That Is!

by Ron DiFrango

I was preparing to attend the CamelOne conference and since it had been a while since I’ve used Apache Camel, I thought I would take it for a spin.  For those of you who don’t know Apache Camel, is the premier open source integration framework and it implements all of the Enterprise Integration patterns.  For a good overall introduction I would recommend the following post.  I also, would highly recommend the Camel In Action book as well.

This desire to utilize happened to coincide with a problem I was attempting to solve at my current client.  At my client, we have what I will call is poor man’s performance monitoring via log4j based aspects.  These aspects start and stop a timer on each annotated method invocation and write a record to our log files.  We were running our performance testing and I wanted to parse the log file and write the records to a database table so that we could analyze the results.

Apache Camel supports Java, Spring XML, and Scala DSL’s, among others.  As a recent convert to Scala, I chose the Scala DSL.  For the process I was trying to model needed to accomplish the following:

  1. Read a file from the file system.
  2. Parse each line of the file into the individual pieces and parts that are to be stored into the database.
  3. Load the resulting lines into the table.

In order to accomplish, this I used the following components:

  1. File processor with a Splitter
  2. Custom processor to parse each line.
  3. JDBC Processor

In order to implement a custom processor, you need need to implement Processor interface.  Because, I’m using Scala and I’m just constructing an inline app, the function definition is as follows:

    val processor = (exchange: Exchange) => {
    // Get the incoming line from the message exchange
    val line = exchange.getIn.getBody(classOf[String])
    // Remove the first part of the log message
    val lineSplit = line.split("\\[\\]\\: ")
    // Break it down into the individual components
    val split = lineSplit(1).split(" ")
    // Build the insert statement
    val x = MessageFormat.format(format, split(0), split(1), split(2), split(3), split(4), split(5), split(7), split(10))
    // Replace the incoming message with the insert statement
    exchange.getIn().setBody(x)
  }

Next up, I need to establish the Database connection so that the JDBC component can insert.  I used Apache Commons DBCP and setup the connection and registered it with Camel:

  // Build the connection pool
  val ds = new BasicDataSource
  ds.setDriverClassName("oracle.jdbc.OracleDriver")
  ds.setUrl("jdbc:oracle:thin:@localhost:1521:xe")
  ds.setUsername("*****")
  ds.setPassword("*****")
  // Register it with Camel
  val reg = new SimpleRegistry 
  reg.put("dataSource", ds)

The next part of the exercise is to setup the camel route builder:

  // Create the Camel Context
  val context = new DefaultCamelContext(reg)
  // Build the Route
  context.addRoutes(new RouteBuilder {
    // Read the file(s) from the directory
    "file:perf" ==> {
      // Split the file up at each line
      split(_.getIn().getBody(classOf[String]).split("\n")) {
        // The further split up the processing across multiple parsers
        loadbalance roundrobin {
        // Reference to my internal components
          to("direct:x")
          to("direct:y")
          to("direct:z")
        }
      // Join back the results from aboce
      // then split it out again turning off commit level changes
      }.loadbalance roundrobin {
        to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      	to("jdbc:dataSource?resetAutoCommit=false")
      }
    }
 
    // Setup my internal processors with references to my custom component
    "direct:x" process (processor)
    "direct:y" process (processor)
    "direct:z" process (processor)
  })

The last part of the exercise is to execute this from my main program:

  // Start the camel process
  context.start
  // Wait for the process to execute
  Thread.sleep(10000)
  // Stop the camel process
  context.stop

I was impressed with a little bit of code [less than 70 lines], I was able to process a 50MB in under 10 seconds.  One note, the Scala builder does not support all of the features of the Java DSL.  In particular, the splitter does not delete files after processing them , therefore requiring me to process 1 file at a time.

My next exercise is to use the SFTP component to automatically fetch the files for me.