MapReduce: Joining Multiple Avro Files Using Custom Serialization

Recently at work I was tasked with developing a number of mapreduce jobs involving Avro files. The final step of the process was a join between avro and tab-delimited text files. Because the final output consisted of the original avro records updated with information from the flat files, I decided to convert the text files into avro in the mapper and then send them on for joining in the reduce phase.

Here’s where I ran into some trouble.

AvroWrapper, AvroKey, AvroValue

Although Avro is well supported in Hadoop MapReduce, the design is rather limiting. A typical Writable bean is capable of performing serde operations on itself. This design is clean, compact, and makes it relatively easy to implement your own custom writables. However, Avro records require a schema to be deserialized. In order to eliminate the need to include the schema with every individual Avro record, the library includes an AvroWrapperAvroKey and AvroValue. The AvroWrapper as it sounds merely wraps an individual record or datum. You don’t use the wrapper directly however. When specifying a key or value in your job, you must select AvroKey for keys, AvroValue for values (duh, right?).

If you look into these two subclasses you’ll discover something curious — they’re essentially empty.

The reason for this is that serde operations on these classes is handled by the AvroSerialization class as configured through the AvroJob static helper methods. The reader and writer schemas may then be registered with the serialization class and the AvroKey and AvroValue wrappers act merely to tag your records and identify which schema to use. This works quite well for most jobs, however it leaves us limited to one set of schemas for keys and another for values. But what if we want to perform a join operation on two or more Avro records with different schemas?

AvroMultiWrapper and MultiSchemaAvroSerialization

To work around this limitation, a new serialization implementation was needed. Clearly we couldn’t embed the schema in with each key as this would explode the volume of data flowing through the job. I also refused to add separate tagging interfaces for each schema — this would mean creating custom wrapper extensions for each record type for this :job and future jobs using the library.

What if, however, you could tag each record with the appropriate reader schema for deserialization? Limiting this tag to a single byte would keep the overhead low while supporting up to 128 different schemas.

The design mirrors the standard Avro mapreduce libraries. To configure schemas in the system, you call the static MultiSchemaAvroSerialization.registerSchemas method which populates the set of schemas:

public static void registerSchemas(Job job, Schema...schemas) {
	String[] names = new String[schemas.length];
	int idx = 0;
	for (Schema schema : schemas) {
		names[idx++] = schema.getFullName();
	}
	job.getConfiguration().setStrings(CONF_KEY_MULTI_SCHEMAS, names);
	registerSerialization(job);
}

For your map output key and/or value you’d specify the AvroMultiWrapper and that’s it.

When the key and/or value datum is serialized, the MultiSchemaAvroSerialization will first lookup the ordinal of the schema from the registered set and include that as the first byte of the serialized datum:

public void serialize(AvroMultiWrapper<T> avroWrapper) throws IOException {
	DatumWriter<T> writer = datumWriterFor((Class<T>) avroWrapper.datum().getClass());
	int b = MultiSchemaAvroSerialization.getIndexForSchema(getConf ), avroWrapper.datum().getClass());
	outputStream.write(b);
	writer.write(avroWrapper.datum(), encoder);
	this.encoder.flush();
}

The same thing happens only in reverse for deserializing our wrapper:

public AvroMultiWrapper<T> deserialize(AvroMultiWrapper<T> wrapper)
		throws IOException {
	if (wrapper == null) {
		wrapper = new AvroMultiWrapper<T>();
	}

	// Read in the first byte - the schema index
	int schemaIndex = decoder.inputStream().read();

	// Now hand off the rest to the datum reader for normal deser.
	DatumReader<T> reader = datumReaderFor(schemaIndex);
	wrapper.datum(reader.read(wrapper.datum(), decoder));
	return wrapper;
}

I’ve used this successfully in conjunction with the standard AvroSerialization and AvroKey/AvroValue classes when inputting from Avro files and then emitting multiple schemas from the Mapper class. I’ve not benchmarked performance of this as there wasn’t really an alternative to this process for my job – however the only additional overhead here should be the additional byte. The schema lookups should cost basically the same in the “standard” serialization and the multi.

Please take a look at the code and let me know what you think!

Leave a Reply