Category Archives: Hadoop

MapReduce: Joining Multiple Avro Files Using Custom Serialization

Recently at work I was tasked with developing a number of mapreduce jobs involving Avro files. The final step of the process was a join between avro and tab-delimited text files. Because the final output consisted of the original avro records updated with information from the flat files, I decided to convert the text files into avro in the mapper and then send them on for joining in the reduce phase.

Here’s where I ran into some trouble.

AvroWrapper, AvroKey, AvroValue

Although Avro is well supported in Hadoop MapReduce, the design is rather limiting. A typical Writable bean is capable of performing serde operations on itself. This design is clean, compact, and makes it relatively easy to implement your own custom writables. However, Avro records require a schema to be deserialized. In order to eliminate the need to include the schema with every individual Avro record, the library includes an AvroWrapperAvroKey and AvroValue. The AvroWrapper as it sounds merely wraps an individual record or datum. You don’t use the wrapper directly however. When specifying a key or value in your job, you must select AvroKey for keys, AvroValue for values (duh, right?).

If you look into these two subclasses you’ll discover something curious — they’re essentially empty.

The reason for this is that serde operations on these classes is handled by the AvroSerialization class as configured through the AvroJob static helper methods. The reader and writer schemas may then be registered with the serialization class and the AvroKey and AvroValue wrappers act merely to tag your records and identify which schema to use. This works quite well for most jobs, however it leaves us limited to one set of schemas for keys and another for values. But what if we want to perform a join operation on two or more Avro records with different schemas?

AvroMultiWrapper and MultiSchemaAvroSerialization

To work around this limitation, a new serialization implementation was needed. Clearly we couldn’t embed the schema in with each key as this would explode the volume of data flowing through the job. I also refused to add separate tagging interfaces for each schema — this would mean creating custom wrapper extensions for each record type for this :job and future jobs using the library.

What if, however, you could tag each record with the appropriate reader schema for deserialization? Limiting this tag to a single byte would keep the overhead low while supporting up to 128 different schemas.

The design mirrors the standard Avro mapreduce libraries. To configure schemas in the system, you call the static MultiSchemaAvroSerialization.registerSchemas method which populates the set of schemas:

public static void registerSchemas(Job job, Schema...schemas) {
	String[] names = new String[schemas.length];
	int idx = 0;
	for (Schema schema : schemas) {
		names[idx++] = schema.getFullName();
	}
	job.getConfiguration().setStrings(CONF_KEY_MULTI_SCHEMAS, names);
	registerSerialization(job);
}

For your map output key and/or value you’d specify the AvroMultiWrapper and that’s it.

When the key and/or value datum is serialized, the MultiSchemaAvroSerialization will first lookup the ordinal of the schema from the registered set and include that as the first byte of the serialized datum:

public void serialize(AvroMultiWrapper<T> avroWrapper) throws IOException {
	DatumWriter<T> writer = datumWriterFor((Class<T>) avroWrapper.datum().getClass());
	int b = MultiSchemaAvroSerialization.getIndexForSchema(getConf ), avroWrapper.datum().getClass());
	outputStream.write(b);
	writer.write(avroWrapper.datum(), encoder);
	this.encoder.flush();
}

The same thing happens only in reverse for deserializing our wrapper:

public AvroMultiWrapper<T> deserialize(AvroMultiWrapper<T> wrapper)
		throws IOException {
	if (wrapper == null) {
		wrapper = new AvroMultiWrapper<T>();
	}

	// Read in the first byte - the schema index
	int schemaIndex = decoder.inputStream().read();

	// Now hand off the rest to the datum reader for normal deser.
	DatumReader<T> reader = datumReaderFor(schemaIndex);
	wrapper.datum(reader.read(wrapper.datum(), decoder));
	return wrapper;
}

I’ve used this successfully in conjunction with the standard AvroSerialization and AvroKey/AvroValue classes when inputting from Avro files and then emitting multiple schemas from the Mapper class. I’ve not benchmarked performance of this as there wasn’t really an alternative to this process for my job – however the only additional overhead here should be the additional byte. The schema lookups should cost basically the same in the “standard” serialization and the multi.

Please take a look at the code and let me know what you think!

Custom Hamcrest Matchers for Testing HBase Puts

A fairly common pattern for my work these days involves ETL type jobs using MapReduce. These operate on simple flat-file inputs and, after some fairly basic transformation steps, emit the results into one or more HBase tables. For my initial job, I used MRUnit as a tool for a test-driven development process. I then proceeded to develop a series of unit tests utilizing Hamcrest’s excellent built-in matchers. This all worked well-enough initially, however the resulting test cases soon grew into a rather complex (and brittle) mess of method calls and iterations.

For example, to find out if we had issued a “put” for a given column and value, we might use something like the following:

byte[] columnFamily = "a".getBytes();
byte[] columnQualifier = "column1".getBytes();
byte[] expectedValue = "value".getBytes();

List<KeyValue> kvList = put.get(columnFamily, columnQualifier);
boolean found = false;
for (KeyValue kv : kvList) {
	byte[] actual = kv.getValue();
	if (Arrays.equals(actual,expected)) {
		found = true;
		break;
	}
}
assertTrue(found);

In an effort to clean this up, I searched around for some HBase-specific matchers, and finding none, decided to develop and contribute my own to the cause.

RowKeyMatcher.java

Row keys, like all of the values stored in HBase, are persisted as a byte array. These values frequently are better – and more usefully represented – in a different data type. The row key matcher handles these conversions for you, making for cleaner and more readable test code. The RowKeyMatcher is an extension of the hamcrest FeatureMatcher, making for a pretty simple class:

public class RowKeyMatcher<T> extends FeatureMatcher<Mutation, T> {

	public static final String NAME = "Put Row Key";

	public static final String DESCRIPTION = "row key";

	private final Class<T> valueClass;

	public RowKeyMatcher(Matcher<? super T> subMatcher, Class<T> valueClass) {
		super(subMatcher, NAME, DESCRIPTION);
		this.valueClass = valueClass;
	}

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.FeatureMatcher#featureValueOf(java.lang.Object)
	 */
	@Override
	protected T featureValueOf(Mutation mutation) {
		byte[] bytes = mutation.getRow();
		return (T)valueOf(bytes, this.valueClass);
	}

	public <T> T valueOf(byte[] bytes, Class<? extends T> valueClass) {
		if (byte[].class.equals(valueClass)) {
 			return (T)bytes;
 		}
		else if (String.class.equals(valueClass)) {
 			return (T)Bytes.toString(bytes);
 		}
 		else if (Long.class.equals(valueClass)) {
 			return (T)Long.valueOf(Bytes.toLong(bytes));
 		}
	 	else if (Double.class.equals(valueClass)) {
 			return (T)Double.valueOf(Bytes.toDouble(bytes));
 		}
	 	else if (Float.class.equals(valueClass)) {
	 		return (T)Float.valueOf(Bytes.toFloat(bytes));
 		}
	 	else if (Integer.class.equals(valueClass)) {
 			return (T)Integer.valueOf(Bytes.toInt(bytes));
 		}
 		else if (Short.class.equals(valueClass)) {
 			return (T)Short.valueOf(Bytes.toShort(bytes));
 		}
 		return null;
 	}
}

It may be used as follows:

assertThat(put, hasRowKey(lessThan(100L), Long.class));
assertThat(put, hasRowKey(not(greaterThan(100L)), Long.class));
assertThat(put, hasRowKey(startsWith("row"))); // default is String

ColumnMatcher.java

Matching columns is a bit more involved. It was in fact the primary motivation for developing these classes, as it was this part of the unit tests that were the most convoluted and looked the ugliest. Column names in HBase are composed of a column family, which itself must be composed of printable characters, and a qualifier which may be any sequence of bytes. This posed some challenges to making this matcher something both easy to use and read. In real life, the projects I’ve worked on all employ human-readable column names. If we limit ourselves to string representations, we can construct a rather elegant matcher that accepts any Matcher<String>. If we follow the convention of using a colon to separate our family from the column qualifier, we can pass in a single string to our matcher of the form column-family:qualifier and take advantage of the array of Matcher<String>’s available with hamcrest. This includes things like startsWith, endsWith, or containsString. At this point, if I need to test a column name composed of a non-string qualifier, I would consider either a) creating a new matcher for this purpose, or b) resorting to the more brute force approach of iterating through the values and converting within the unit test. The core code:

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.TypeSafeDiagnosingMatcher#matchesSafely(java.lang.Object, org.hamcrest.Description)
	 */
	@Override
	protected boolean matchesSafely(Mutation mutation, Description mismatch) {
		return findMatches(mutation, mismatch, true).size() > 0;
	}

	/**
	 * 
	 * @param mutation
	 * @param mismatch
	 * @param stopOnFirstMatch
	 * @return
	 */
	protected List<KeyValue> findMatches(Mutation mutation, Description mismatch, boolean stopOnFirstMatch) {
		List<KeyValue> matches = new ArrayList<KeyValue>();
		Map<byte[], List<KeyValue>> familyMap = mutation.getFamilyMap();
		int count = 0;
		String columnName;
		for (Entry<byte[], List<KeyValue>> family : familyMap.entrySet()) {
			// Family must be composed of printable characters
			String familyStr = Bytes.toString(family.getKey());
			for (KeyValue column : family.getValue()) {
				String qualifier = Bytes.toString(column.getQualifier());
				// Match the name using the supplied matcher.
				columnName = familyStr + ":" + qualifier;
				if (this.nameMatcher.matches(columnName)) {
					matches.add(column);
					if (stopOnFirstMatch) {
						return matches;
					}
				}
				if (count++ > 0) {
					mismatch.appendText(", ");
				}
				nameMatcher.describeMismatch(columnName, mismatch);
			}
		}
		return matches;
	}

Examples of usage:

assertThat(put, hasColumn("a:column1"));
assertThat(put, hasColumn(is("a:column1")));
assertThat(put, hasColumn(startsWith("a:col")));
assertThat(put, hasColumn(not(startsWith("b:col"))));
assertThat(put, hasColumn(containsString("value")));

KeyValueMatcher.java

The KeyValueMatcher enables us to write assertions that validate presence of an operation setting a cell to a specific value. Providing a column matcher is optional, allowing you to write an assertion for a value regardless of the column. We again leverage generics to enable typesafe conversions among the wrapper types and the use of the built-in hamcrest primitive matchers. The core code:

	/*
	 * (non-Javadoc)
	 * @see org.hamcrest.TypeSafeDiagnosingMatcher#matchesSafely(java.lang.Object, org.hamcrest.Description)
	 */
	@Override
	protected boolean matchesSafely(Mutation mutation, Description mismatch) {	
		// Delegate check for column match to 
		List<KeyValue> matchingKeyValues = columnMatcher.findMatches(mutation, mismatch, false);
		if (matchingKeyValues.size() == 0) {
			columnMatcher.describeMismatch(mutation, mismatch);
			return false;
		}

		// Check the key-values for a matching value
		int count = 0;
		for (KeyValue columnMatch : matchingKeyValues) {
			byte[] valueBytes = columnMatch.getValue();
			VAL value = (VAL)Matchers.valueOf(valueBytes, this.valueClass);
			if (valueMatcher.matches(value)){
				return true;
			}
			if (count++ > 0) {
            	mismatch.appendText(", ");
            }
            valueMatcher.describeMismatch(value, mismatch);
		}
		return false;
	}

With KeyValueMatcher, we can put the whole thing together…

assertThat(put, hasKeyValue(hasColumn("a:column1"), "avalue1"));
 assertThat(put, hasKeyValue(hasColumn("a:column1"), is("avalue1")));
 assertThat(put, hasKeyValue(hasColumn(is("a:column1")), is("avalue1")));
 assertThat(put, hasKeyValue(hasColumn(startsWith("a:col")), containsString("value")));

Getting the code

The full source code for these matchers is available on my “hadoop” project on github. Clone the project, take a look and let me know what you think!

Next steps…

And that’s basically all there is to it. The next bit of fun was hooking this all up to the map-reduce code. MRUnit was designed for this task and in a future post I’ll show how it can be used with MultiTableOutputFormat as both the primary job output and as part of a named multiout using Mockito’s ArgumentCaptor to intercept and inspect the operations.