Using Kafka to Distribute and Dual-load Timeseries Data

This article originally posted at http://engineering.conversantmedia.com/2015/08/24/using-kafka-to-distribute-and-dual-load-timeseries-data/

kafka_open_tsdb
At Conversant, we love OpenTSDB.

Operations teams, software engineers, data scientists, QA, and even some on the business side scrutinize these colorful graphs daily. OpenTSDB allows us to effectively visualize the various metrics collected from our hundreds of servers across several data centers.

Numerous home-grown scripts have been developed to mine the data for engineering reports, alerting and quality through the RESTful API. We screen-scrape the PNGs for internal dashboards. More recently OpenTSDB has taken on the task of looping some of this data back into the production chain (much to the chagrin of engineering management working to replace this bit!).

Once a pet-project, a closely-guarded engineering-only tool, it has since grown into a respectable dozen-node cluster with strong HA SLAs persisting over 550 million data points per day.

Room for Improvement

kafka_tsdb_currentLast year we set out to perform a major upgrade of the cluster. Load balancing was introduced (via HAProxy) and an external CNAME created to provide some measure of indirection and flexibility. The original hope was to build a brand new cluster from scratch and port the historical data over.

Unfortunately this wasn’t practical – it would take several days to copy the data and there was no easy way of back-filling the missed events once the initial copy had completed. Instead we opted to take the cluster down to upgrade the Hadoop stack (an older version that didn’t support rolling upgrades), and left the OS upgrade for later.

The nature of the metrics collection scripts – a loose assemblage of python and bash scripts – meant that ALL metrics collection would cease during this planned outage. The collection scripts would of course continue to run, but the events would simply be dropped on the floor when the cluster wasn’t available to persist them.

This was clearly less than ideal and an obvious candidate for enhancement.

Conversant needed a solution that would enable taking the cluster down for maintenance while continuing to buffer incoming events so they could be applied when work was complete. Additionally, I wanted to build a backup system for DR purposes and for A/B testing upgrades, or potential performance or stability enhancements at scale. A secondary instance would also be useful for protecting the primary cluster from “rogue” users making expensive requests while engineering is doing troubleshooting. The primary cluster would be kept “private” while the backup was exposed to a wider audience.

A Use Case for Kafka

kafka_tsdb_futureThis seemed like a perfect use for Kafka – a distributed, durable, message queue/pub-sub system. In fact, this specific issue brought to mind an article I’d read a few months back on O’Reilly’s Radar blog by Gwen Shapira. In the article, Shapira discusses the benefits of inserting Kafka into data pipelines to enable things like double loading data for testing and validating different databases and models.

Kafka could be inserted into the flow – sending all metrics into Kafka where they would be buffered for consumption by the OpenTSDB cluster. Most of the time this would function in near-realtime with the cluster consuming the events nearly as fast as they are produced.

However, should the cluster become unavailable for any reason, Kafka will happily continue to buffer them until service is restored. In addition, a separate backup cluster could be built and concurrently loaded by consuming the same event stream. In fact, nothing would prevent us from setting up separate kafka topics for each component of the stack enabling selective consumption of subsets of the metrics by alternative systems.

The Conversant data team has been working with Apache Kafka since late last year, delivering and administering two production clusters. The largest of these handles billions of log records every day and has proven to be rock solid. There was no doubt the cluster could broker metric data reliably.

Simple topic/key design

The initial plan was to publish the TSDB put commands from the current scripts directly into kafka via the kafka-console-producer. Though the easiest and fastest way to prove this out, it would negate some of the benefits of the abstraction.

A simple design was devised instead for a set of topics and keys to represent the metrics. Each component in the stack pushes to a separate component-specific topic. The metric name is used as the key for each message. The message payload is essentially everything else: the timestamp, tags, and metric value. For now these are left as the bare strings. A future enhancement may include packing these into a custom protocol buffers object or JSON structure.

kafka_tsdb_message

Future

By introducing HBase-level Snappy compression on the tsdb table, implementing a more practical TTL of 2 years, and performing a major compaction, there’s more than enough room to make it possible to split the single cluster into a separate primary and secondary. Other groups are already interested in tapping into this stream – either directly from Kafka or through the new “public” OpenTSDB service. Work on standardizing the metric collection and publishing process code will start soon, providing a more maintainable codebase to support future enhancements and growth. There’s even a possibility of tapping into the Kafka event streams directly using a custom consumer for things like monitoring for discrete critical events.

This new architecture provides greater flexibility, availability, and redundancy. At Conversant, we love OpenTSDB + Kafka.

On Becoming Functional

This article originally posted at http://engineering.conversantmedia.com/2015/07/06/on-becoming-functional/

The final verdict may yet be out but the trend is quite clear – imperative style is old and busted, functional is the new black; at least as far as “big data” is concerned.

This isn’t a book review, though it was prompted by reading the concise and thought-provoking book “Becoming Functional” by Joshua Backfield. Backfield’s book does a good job of introducing functional concepts to imperative programmers and I definitely recommend this quick read to other Java developers considering making the transition. Conversant engineers will find a copy to lend sitting on my desk.

Concise code

XKCD: By the year 2019 all information will be communicated in this clear and concise format
xkcd.com “Tall Infographics”

I’ve been an imperative style coder for as long as I can remember (and far before I’d ever heard the term ‘imperative’), working almost exclusively in Java since well before Y2K. While I wouldn’t be shocked if over time I grew to find functional code easier to read and clearer in intent, at this point I have a difficult time appreciating what others apparently take as a given. Article after article pronounces functional code as obviously clearer – displaying code as if it’s self-evident that functional trumps imperative on readability. Although this certainly may be true in some – or even many – cases, I don’t find it universally so.

Another excellent book, Cay Horstman’s “Scala for the Impatient,” follows suit. Take for example, this passage from Horstman’s book:

The Scala designers think that every keystroke is precious, so they let you combine a class with its primary constructor. When reading a Scala class, you need to disentangle the two.

I find this logic in some ways backward. Modern IDEs are exceptionally good at generating boilerplate automatically. This dramatically limits the amount of finger typing required already. By further compacting the syntax, what you save in upfront keystrokes may be paid out again by other developers (even you) in the time required to “disentangle the two.”

Reductio ad absurdum

Taken to the extreme, one could argue that a great way to cut down keystrokes – perhaps more than the concise constructor syntax – would be to limit name lengths. Why not keep our method names down to a single character?

class A ( b: String ) {
def c: Int = { ... }
}

This is clearly absurd and an unfair comparison as there’s no amount of language knowledge that would provide hints as to the purpose of the method simpy by viewing the API. However, my point is simply that conciseness != readability in every case – especially for the less experienced developer, or those new to a given language.

Recursion

The other area that concerns me is the preference of recursion over iteration. In order to maintain immutability, recursion becomes a necessity. However, it certainly isn’t natural for me to think and write recursively. I’ve had to spend significant time and expend some effort to start to see recusion where iteration appears the obvious quick and easy way. Although I’m confident I can ultimately work effectively in this paradigm, I’m concerned this will significantly limit our pool of potential new recruits. I think Joel Spolsky puts it well in his Guerilla Guide to Interviewing:

Whereas ten years ago it was rare for a computer science student to get through college without learning recursion and functional programming in one class and C or Pascal with data structures in another class, today it’s possible in many otherwise reputable schools to coast by on Java alone.

Take this example from Backfield (p59), the iterative solution:

public static int countEnabledCustomersWithNoEnabledContacts (
    List<Customer> customers){
 int total = 0
 for(Customer customer : customers) {
  if (customer.enabled) {
   if (customer.contacts.find({ contact -> contact.enabled}) == null) {
    total = total +1
   }
  }
 }
 return total
}

And the tail-recursive functional version:

def countEnabledCustomersWithNoEnabledContacts(customers : List[Customer],
    sum : Int) : Int = {
 if (customers.isEmpty) {
  return sum
 } else {
  val addition : Int = if(customers.head().enabled &&
                          (customers.head().contacts.count(contact =>
                            contact.enabled)) > 0 { 1 } else { 0 }
   return countEnabledCustomersWithNoEnabledContacts(customers.tail.
                                                     sum + addition)
 }
}

The ‘issue’ with the iterative solution per Backfield (and other functional evangelists) is the mutable variable ‘total’. Although I’ve been burned innumerable times by side effects, never can I recall something as generally benign as a mutable local counter causing hard-to-diagnose issues. Side effects due to mutability are a real problem – but my experience with these problems is limited to mutable collections with elements being changed, or member variables being updated in a method that doesn’t advertise the fact that it might happen. Fear of mutable method scoped primitives seems irrational to me.

And in the example code above I argue the cure is significantly worse than the disease. We’re replacing relatively straightforward iterative code with something that to me (and many I suspect) appears far less clear. This comes with the added benefit of running slower and being less efficient! Ah, but of course in languages like Scala the compiler takes care of replacing this recursion under the covers with what? Iteration! However, you only get the benefit of iterative processing if you know the proper way to structure your tail recursion. Fail at that and you wind up with a bloated stack.

Why on earth shouldn’t I just write it iteratively in the first place and save the compiler – and the next maintainer – the trouble?

Anonymous functions and long call chains

When reading books and articles on functional programming, one can’t help but run across examples including long chains of function calls or multiple levels of nested anonymous functions.

This particular line from Backfield (p88) is a good example of the kind of code that makes me reach for my reading glasses:

Customer.allCustomers.filter({ customer =>
    customer.enabled && customer.contract.enabled
}).foreach({ customer =>
    customer.contacts.foreach(cls)
})

Though not nested in this example the extra spaces and chains of calls with anonymous functions make it harder to immediately recognize that we’re merely applying a foreach to elements matching the filter. Obviously there’s no requirement to do this with anonymous functions – using named functions I think would help shorten up the line and make this a bit easier to instantly grasp. Functional programming however appears to promote and even prefer this style of coding. (Though perhaps that’s just functional programming literary examples?)

You may argue that this really isn’t all that difficult to read – and I’m sure you’re not alone. My complaint however is not about the ability to read, but my ability to read fast. I find when I need to scan multiple lines of code with anonymous functions, I tend to ‘push’ the anonymous functions into my own internal ‘stack’. The longer the chain of calls and the more anonymous functions included, the larger the stack in my head – and the longer it takes to see the intent of the code as I pop each item off. And on this I’m certain that I’m not alone (see Miller’s Law).

Add a handful of these constructs into a single section of code and this can significantly slow down troubleshooting, debugging, and code reviews.

…and please don’t get me started on currying.

Debugging is twice as hard as writing the code in the first place. Therefore, if you write the code as cleverly as possible, you are, by definition, not smart enough to debug it. – Brian Kernighan

What next?

xkcd: Tail recursion is it's own reward.
xkcd.com “functional”

And after all that…I do find functional style compelling and interesting. There is a sort of beauty to well-crafted code in general and functional code in particular. Much of the work I did a few years back in the web space using client-side (and some server-side) JavaScript involved functional coding. And I admit it can be fun and somewhat addictive.

So where to go from here? As I said at open, functional is definitely the new coolness in “big data” and we ignore it at our own peril. Many of the tools we’re using today were developed in functional languages like Scala or Clojure. I’m just not ready to commit wholesale to doing things exclusively (or even mostly) the “functional way”. To this end, my team has been dabbling in Scala, writing unit and functional tests using the excellent ScalaTest suite. The more I work with Scala, the more I like it…and the easier it gets to fully grok the Kafka code I frequently find myself digging into.

With time perhaps many if not all of my concerns will be proven unfounded – or at least mitigated effectively. Internally we’ll continue to promote efforts to absorb functional concepts and incorporate the best bits into our everyday engineering efforts.

Scala has gained significant traction on my team and within the Converesant engineering organization. The plan is to continue to drive this process to discover how far it takes us in becomming more functional.

MapReduce: Joining Multiple Avro Files Using Custom Serialization

Recently at work I was tasked with developing a number of mapreduce jobs involving Avro files. The final step of the process was a join between avro and tab-delimited text files. Because the final output consisted of the original avro records updated with information from the flat files, I decided to convert the text files into avro in the mapper and then send them on for joining in the reduce phase.

Here’s where I ran into some trouble.

AvroWrapper, AvroKey, AvroValue

Although Avro is well supported in Hadoop MapReduce, the design is rather limiting. A typical Writable bean is capable of performing serde operations on itself. This design is clean, compact, and makes it relatively easy to implement your own custom writables. However, Avro records require a schema to be deserialized. In order to eliminate the need to include the schema with every individual Avro record, the library includes an AvroWrapperAvroKey and AvroValue. The AvroWrapper as it sounds merely wraps an individual record or datum. You don’t use the wrapper directly however. When specifying a key or value in your job, you must select AvroKey for keys, AvroValue for values (duh, right?).

If you look into these two subclasses you’ll discover something curious — they’re essentially empty.

The reason for this is that serde operations on these classes is handled by the AvroSerialization class as configured through the AvroJob static helper methods. The reader and writer schemas may then be registered with the serialization class and the AvroKey and AvroValue wrappers act merely to tag your records and identify which schema to use. This works quite well for most jobs, however it leaves us limited to one set of schemas for keys and another for values. But what if we want to perform a join operation on two or more Avro records with different schemas?

AvroMultiWrapper and MultiSchemaAvroSerialization

To work around this limitation, a new serialization implementation was needed. Clearly we couldn’t embed the schema in with each key as this would explode the volume of data flowing through the job. I also refused to add separate tagging interfaces for each schema — this would mean creating custom wrapper extensions for each record type for this :job and future jobs using the library.

What if, however, you could tag each record with the appropriate reader schema for deserialization? Limiting this tag to a single byte would keep the overhead low while supporting up to 128 different schemas.

The design mirrors the standard Avro mapreduce libraries. To configure schemas in the system, you call the static MultiSchemaAvroSerialization.registerSchemas method which populates the set of schemas:

public static void registerSchemas(Job job, Schema...schemas) {
	String[] names = new String[schemas.length];
	int idx = 0;
	for (Schema schema : schemas) {
		names[idx++] = schema.getFullName();
	}
	job.getConfiguration().setStrings(CONF_KEY_MULTI_SCHEMAS, names);
	registerSerialization(job);
}

For your map output key and/or value you’d specify the AvroMultiWrapper and that’s it.

When the key and/or value datum is serialized, the MultiSchemaAvroSerialization will first lookup the ordinal of the schema from the registered set and include that as the first byte of the serialized datum:

public void serialize(AvroMultiWrapper&lt;T&gt; avroWrapper) throws IOException {
	DatumWriter&lt;T&gt; writer = datumWriterFor((Class&lt;T&gt;) avroWrapper.datum().getClass());
	int b = MultiSchemaAvroSerialization.getIndexForSchema(getConf ), avroWrapper.datum().getClass());
	outputStream.write(b);
	writer.write(avroWrapper.datum(), encoder);
	this.encoder.flush();
}

The same thing happens only in reverse for deserializing our wrapper:

public AvroMultiWrapper&lt;T&gt; deserialize(AvroMultiWrapper&lt;T&gt; wrapper)
		throws IOException {
	if (wrapper == null) {
		wrapper = new AvroMultiWrapper&lt;T&gt;();
	}

	// Read in the first byte - the schema index
	int schemaIndex = decoder.inputStream().read();

	// Now hand off the rest to the datum reader for normal deser.
	DatumReader&lt;T&gt; reader = datumReaderFor(schemaIndex);
	wrapper.datum(reader.read(wrapper.datum(), decoder));
	return wrapper;
}

I’ve used this successfully in conjunction with the standard AvroSerialization and AvroKey/AvroValue classes when inputting from Avro files and then emitting multiple schemas from the Mapper class. I’ve not benchmarked performance of this as there wasn’t really an alternative to this process for my job – however the only additional overhead here should be the additional byte. The schema lookups should cost basically the same in the “standard” serialization and the multi.

Please take a look at the code and let me know what you think!

Custom Hamcrest Matchers for Testing HBase Puts

A fairly common pattern for my work these days involves ETL type jobs using MapReduce. These operate on simple flat-file inputs and, after some fairly basic transformation steps, emit the results into one or more HBase tables. For my initial job, I used MRUnit as a tool for a test-driven development process. I then proceeded to develop a series of unit tests utilizing Hamcrest’s excellent built-in matchers. This all worked well-enough initially, however the resulting test cases soon grew into a rather complex (and brittle) mess of method calls and iterations.

For example, to find out if we had issued a “put” for a given column and value, we might use something like the following:

byte[] columnFamily = "a".getBytes();
byte[] columnQualifier = "column1".getBytes();
byte[] expectedValue = "value".getBytes();

List<KeyValue> kvList = put.get(columnFamily, columnQualifier);
boolean found = false;
for (KeyValue kv : kvList) {
	byte[] actual = kv.getValue();
	if (Arrays.equals(actual,expected)) {
		found = true;
		break;
	}
}
assertTrue(found);

In an effort to clean this up, I searched around for some HBase-specific matchers, and finding none, decided to develop and contribute my own to the cause.

RowKeyMatcher.java

Row keys, like all of the values stored in HBase, are persisted as a byte array. These values frequently are better – and more usefully represented – in a different data type. The row key matcher handles these conversions for you, making for cleaner and more readable test code. The RowKeyMatcher is an extension of the hamcrest FeatureMatcher, making for a pretty simple class:

public class RowKeyMatcher<T> extends FeatureMatcher<Mutation, T> {

	public static final String NAME = "Put Row Key";

	public static final String DESCRIPTION = "row key";

	private final Class<T> valueClass;

	public RowKeyMatcher(Matcher<? super T> subMatcher, Class<T> valueClass) {
		super(subMatcher, NAME, DESCRIPTION);
		this.valueClass = valueClass;
	}

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.FeatureMatcher#featureValueOf(java.lang.Object)
	 */
	@Override
	protected T featureValueOf(Mutation mutation) {
		byte[] bytes = mutation.getRow();
		return (T)valueOf(bytes, this.valueClass);
	}

	public <T> T valueOf(byte[] bytes, Class<? extends T> valueClass) {
		if (byte[].class.equals(valueClass)) {
 			return (T)bytes;
 		}
		else if (String.class.equals(valueClass)) {
 			return (T)Bytes.toString(bytes);
 		}
 		else if (Long.class.equals(valueClass)) {
 			return (T)Long.valueOf(Bytes.toLong(bytes));
 		}
	 	else if (Double.class.equals(valueClass)) {
 			return (T)Double.valueOf(Bytes.toDouble(bytes));
 		}
	 	else if (Float.class.equals(valueClass)) {
	 		return (T)Float.valueOf(Bytes.toFloat(bytes));
 		}
	 	else if (Integer.class.equals(valueClass)) {
 			return (T)Integer.valueOf(Bytes.toInt(bytes));
 		}
 		else if (Short.class.equals(valueClass)) {
 			return (T)Short.valueOf(Bytes.toShort(bytes));
 		}
 		return null;
 	}
}

It may be used as follows:

assertThat(put, hasRowKey(lessThan(100L), Long.class));
assertThat(put, hasRowKey(not(greaterThan(100L)), Long.class));
assertThat(put, hasRowKey(startsWith("row"))); // default is String

ColumnMatcher.java

Matching columns is a bit more involved. It was in fact the primary motivation for developing these classes, as it was this part of the unit tests that were the most convoluted and looked the ugliest. Column names in HBase are composed of a column family, which itself must be composed of printable characters, and a qualifier which may be any sequence of bytes. This posed some challenges to making this matcher something both easy to use and read. In real life, the projects I’ve worked on all employ human-readable column names. If we limit ourselves to string representations, we can construct a rather elegant matcher that accepts any Matcher<String>. If we follow the convention of using a colon to separate our family from the column qualifier, we can pass in a single string to our matcher of the form column-family:qualifier and take advantage of the array of Matcher<String>’s available with hamcrest. This includes things like startsWith, endsWith, or containsString. At this point, if I need to test a column name composed of a non-string qualifier, I would consider either a) creating a new matcher for this purpose, or b) resorting to the more brute force approach of iterating through the values and converting within the unit test. The core code:

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.TypeSafeDiagnosingMatcher#matchesSafely(java.lang.Object, org.hamcrest.Description)
	 */
	@Override
	protected boolean matchesSafely(Mutation mutation, Description mismatch) {
		return findMatches(mutation, mismatch, true).size() > 0;
	}

	/**
	 * 
	 * @param mutation
	 * @param mismatch
	 * @param stopOnFirstMatch
	 * @return
	 */
	protected List<KeyValue> findMatches(Mutation mutation, Description mismatch, boolean stopOnFirstMatch) {
		List<KeyValue> matches = new ArrayList<KeyValue>();
		Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap();
		int count = 0;
		String columnName;
		for (Entry<byte[], List<KeyValue>> family : familyMap.entrySet()) {
			// Family must be composed of printable characters
			String familyStr = Bytes.toString(family.getKey());
			for (KeyValue column : family.getValue()) {
				String qualifier = Bytes.toString(column.getQualifier());
				// Match the name using the supplied matcher.
				columnName = familyStr + ":" + qualifier;
				if (this.nameMatcher.matches(columnName)) {
					matches.add(column);
					if (stopOnFirstMatch) {
						return matches;
					}
				}
				if (count++ > 0) {
					mismatch.appendText(", ");
				}
				nameMatcher.describeMismatch(columnName, mismatch);
			}
		}
		return matches;
	}

Examples of usage:

assertThat(put, hasColumn("a:column1"));
assertThat(put, hasColumn(is("a:column1")));
assertThat(put, hasColumn(startsWith("a:col")));
assertThat(put, hasColumn(not(startsWith("b:col"))));
assertThat(put, hasColumn(containsString("value")));

KeyValueMatcher.java

The KeyValueMatcher enables us to write assertions that validate presence of an operation setting a cell to a specific value. Providing a column matcher is optional, allowing you to write an assertion for a value regardless of the column. We again leverage generics to enable typesafe conversions among the wrapper types and the use of the built-in hamcrest primitive matchers. The core code:

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.TypeSafeDiagnosingMatcher#matchesSafely(java.lang.Object, org.hamcrest.Description)
	 */
	@Override
	protected boolean matchesSafely(Mutation mutation, Description mismatch) {	
		// Delegate check for column match to 
		List<KeyValue> matchingKeyValues = columnMatcher.findMatches(mutation, mismatch, false);
		if (matchingKeyValues.size() == 0) {
			columnMatcher.describeMismatch(mutation, mismatch);
			return false;
		}

		// Check the key-values for a matching value
		int count = 0;
		for (KeyValue columnMatch : matchingKeyValues) {
			byte[] valueBytes = columnMatch.getValue();
			VAL value = (VAL)Matchers.valueOf(valueBytes, this.valueClass);
			if (valueMatcher.matches(value)){
				return true;
			}
			if (count++ > 0) {
            	mismatch.appendText(", ");
            }
            valueMatcher.describeMismatch(value, mismatch);
		}
		return false;
	}

With KeyValueMatcher, we can put the whole thing together…

assertThat(put, hasKeyValue(hasColumn("a:column1"), "avalue1"));
 assertThat(put, hasKeyValue(hasColumn("a:column1"), is("avalue1")));
 assertThat(put, hasKeyValue(hasColumn(is("a:column1")), is("avalue1")));
 assertThat(put, hasKeyValue(hasColumn(startsWith("a:col")), containsString("value")));

Getting the code

The full source code for these matchers is available on my “hadoop” project on github. Clone the project, take a look and let me know what you think!

Next steps…

And that’s basically all there is to it. The next bit of fun was hooking this all up to the map-reduce code. MRUnit was designed for this task and in a future post I’ll show how it can be used with MultiTableOutputFormat as both the primary job output and as part of a named multiout using Mockito’s ArgumentCaptor to intercept and inspect the operations.

Alfresco Share Support for SketchUp Files

I’m a huge fan of Alfresco, the Java-based open source ECM. Having spent the better part of a year migrating a proprietary customization and asset management platform into Alfresco, I developed a deep appreciation for the extensibility of the platform. Because it’s open source, I was able to make use of it at home for managing household documents – both electronic and scanned originals.

As an amateur woodworker and SketchUp user, I’ve also built up a small library of design documents used for past projects. Naturally, I decided to store these in the new Alfresco repository. However, I found the lack of native support for the SketchUp file format frustrating. I was accustomed to seeing thumbnail previews of my other documents and really wanted a preview for SketchUp as well.

Turns out that SketchUp files contain bitmap previews in PNG format, and one need only locate and extract the bits from the SKP file in order to make them available to Alfresco.

The heart of the AbstractContentTransformer2 extension class’s transformInternal method…

		// Write out the PNG
		InputStream in = reader.getContentInputStream();
		OutputStream out = writer.getContentOutputStream();
		try {
			BitmapExtractor extractor = extractors.get(sourceMimetype);
			if (extractor == null) {
				throw new IllegalArgumentException("No extractor configured " +
						"for source mimetype [" + sourceMimetype + "]");
			}
			extractor.extractBitmap(index, in, out);
		}
		finally {
			IOUtils.closeQuietly(in);
			IOUtils.closeQuietly(out);
		}

…calls on a custom PngExtractor to do the actual work (from the superclass AbstractDelimitedBitmapExtractor

	public boolean extractBitmap(int n, InputStream in, OutputStream out) throws IOException {
		byte[] header = getHeaderSequence();
		byte[] footer = getFooterSequence();

		in = buffer(in);
		int count = 0;
		while (nextMatch(header, in)) {
			if (++count == n) {
				// This is the one we're after. Read in the PNG until the PNG_FOOTER...
				out.write(header);
				int b = 0;
				while((b = in.read()) != -1) {
					if (matches(footer, in, b)) {
						// Matched footer. Stream not reset so
						// write footer and exit.
						out.write(footer);
						return true;
					}
					out.write(b);
				}

			}
		}
		return false;
	}

The full set of classes is available on google code under the alfresco-extra-mimetypes project. I hope to extend support to CorelDraw and InDesign documents, among others where possible. Enjoy.