Apache Camel is a popular ETL framework with many components. Camel has a long and storied history and is just awesome all around. There is some cool theory behind how it all works: data flows from producers and goes to consumers. Both producers and consumers can be represented as endpoints. The data is manipulated by a set of constructs named enterprise integration patterns. You can buy the reference book about EIPs here, and here is a succinct doc on it on the Camel website.
What you really need to take away from this introduction is that if your component is in the list, you can safely assume you can route all sorts of data to it.
When I joined Splunk two years ago, I played around one night and sent the Camel team a pull request to add Splunk HEC support. I have to say, my code contribution wasn’t all that good. Thankfully, someone else came around after me and fixed it so it’s just right now. We’re going to use this component today.
When you configure Camel, you define routes by which data will transit. In our case, we want to send data to Splunk. To make this interesting, we can reprise the example from Jose where he used the power of Splunk to read and analyze github stats. In our case, we would like to read github releases.
Github releases are available as an Atom feed. I happen to run a Geth node, an Ethereum client, for my day job, so I’d like to know when a new release is available.
First, to refer to the Atom feed, I type the endpoint of the feed:
https://github.com/ethereum/go-ethereum/releases.atom
Since this is going to use the Atom component, I need to point that out by adding a scheme:
atom:https://github.com/ethereum/go-ethereum/releases.atom
The Atom component offers me to split all entries so I don’t read 10 releases at once. This is done by adding splitEntries=true to the query string of the endpoint:
atom:https://github.com/ethereum/go-ethereum/releases.atom?splitEntries=true
Now, we want to send data to Splunk. The endpoint for Splunk is:
splunk-hec:splunk-host/splunk-token?skipTlsVerify=false&index=myindex&source=myindex&sourcetype=mysourcetype&bodyOnly=true
A few notes:
It looks like this:
from("atom:https://github.com/ethereum/go-ethereum/releases.atom?splitEntries=true").to("splunk-hec:splunk-host/splunk-token?skipTlsVerify=false&index=myindex&source=myindex&sourcetype=mysourcetype&bodyOnly=true");
This gets you to a working route.
Two years later I had another few hours to play with Apache Camel, and I tried to see how far I could take the Camel pipeline to make it as easy as possible to run a tool to ingest multiple Atom feeds.
I took an unconventional approach, I decided to write my code in Kotlin, allow configuration as a TOML file, and best of all, compile all this to run as a native program.
The result is on Github. Please take this for what it is, an experiment to test out a novel packaging method producing a single binary.
The program itself is 75 megabytes once compiled down to a native executable. It takes a unique argument of the path of a configuration file.
For fun, I created a docker compose environment in the repository you can run to see it in action. It runs Splunk next to it and sends a few chosen Atom feeds of github releases to it:
The issue with a simple route (from->to) is that we don’t preserve the state of what was consumed. If we stop and restart our route, we might send data to Splunk twice. Camel defines the concept of an idempotent consumer, and offers several implementations.
Being a hopeless romantic (I just love how it scales), I have used their Infinispan flavor, tied to a local RocksDB database, to use as a cache here.
To set up Infinispan in Kotlin, I simply initiate the cache like so:
val cacheConfig = ConfigurationBuilder().persistence().addStore(RocksDBStoreConfigurationBuilder::class.java)
.location(path.resolve("idempotent").toAbsolutePath().toString())
.expiredLocation(path.resolve("idempotent-expired").toAbsolutePath().toString()).build()
val infinispanConfig = InfinispanEmbeddedConfiguration()
val cacheManager = DefaultCacheManager(GlobalConfigurationBuilder().defaultCacheName("api-scraper").build(), cacheConfig, true)
infinispanConfig.cacheContainer = cacheManager
val repo = InfinispanEmbeddedIdempotentRepository("atom")
repo.configuration = infinispanConfig
Then in-between from and to, we add:
.idempotentConsumer(simple("\${body.id.toASCIIString}"), repo)
simple("\${body.id.toASCIIString}") becomes the key of the entry in the cache.
See it in action in a few lines of this Main class.
I hope you enjoyed reading this blog post. If you have any questions or comments about the code, feel free to star and open issues on the repository. If you are interested in learning more about Apache Camel, please visit their website and join their mailing lists to get acquainted with their community.
— Antoine Toulme, Senior Engineering Manager, Blockchain & DLT
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.