Wednesday, March 25, 2015

A Generic Normalizer for Pentaho Data integration - Revisited

A while ago, I wrote about how to create a generic normalizer for Pentaho Data integration.

To freshen up your memory, the generic normalizer takes any input stream, and for each input row, it outputs one row for each field in the input stream. The output rows contain fields for input row number, input field number and input field value. As such it provides the same functionality as the built-in Row Normaliser step without requiring any configuration, thus allowing it to process arbitrary streams of data.

A reusable normalizer

Recently I received an comment asking for more information on how to make this normalizer more reusable:
I want to use this method to do field level auditing but I want to encapsulate it in a sub transformation to which I can pass the result rows from any step. In your image of "how all these steps work together", instead of a data grid, the input would need to be dynamic in terms of the number of fields/columns and the data types. Could you possibly provide a hint how to make the input to these steps (in your example, the datagrid) dynamic?
In the mean while, I learned a thing or two about kettle's internals and it seemed like a good idea to describe how to improve on the original example and make it suitable to be used in a so-called Mapping, a.k.a. a sub-transformation.

Design for use as Subtransformation

The design for the re-usable generic normalizer is shown below:
The User-defined Java class step in the middle actually implements the normalizer. The Mapping input and Mapping output specification steps allow the normalizer to be called from another transformation. They enable it to respectively receive input data from, and return output data to the calling transformation.

In the screenshot above, the configuration dialogs for both the Mapping input and output specification steps are shown. This is mainly to show that there is no configuration involved: the Mapping input specification step will faithfully pass all fields received from the incoming stream on to the normalizer, and the Mapping output specification will output all fields coming out of the normalizer to the outgoing stream.

Normalizer Improvements

The configuration of the user-defined Java class step differs in a number of aspects from what I used in the original normalizer example. In the original example the normalizer output consisted of three fields:
A sequential integer number identifying the position of the row to which the current output row applies.
A sequential integer number identifying the position of the field to which the current output row applies.
A string representation of the value to which the output applies
The original example used a Metadata Structure of Stream step to obtain metadata of the input stream, and this metadata was then tied to the output of the normalizer using a Stream Lookup step, "joining" the output of the Metadata Structure step with the output of the normalizer using the field number.
The improved generic normalizer adds two more output fields:
The name of the field as it appears in the input stream
The name of the data type of the field as it appears in the input stream
Argueably, these two items are the most important pieces of metadata that were previously provided by the Metadata Structure of Stream in the original example, and I felt many people would probably prefer to have all of that work done by the normalizer itself rather than having to tie all the pieces together in the transformation itself using additional steps.


The code for the user-defined Java class is shown below:
static long rownum = 0;
static RowMetaInterface inputRowMeta;
static long numFields;
static String[] fieldNames;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
  // get the current row
  Object[] r = getRow();

  // If the row object is null, we are done processing.
  if (r == null) {
    return false;

  // If this is the first row, cache some metadata.
  // We will reuse this metadata in processing the rest of the rows.
  if (first) {
    inputRowMeta = getInputRowMeta();
    numFields = inputRowMeta.size(); 
  // Generate a new id number for the current row.
  rownum += 1;

  // Generate one output row for each field in the input stream.
  int fieldnum;
  ValueMetaInterface valueMetaInterface;
  for (fieldnum = 0; fieldnum < numFields; fieldnum++) {
    //get metadata for the current field
    valueMetaInterface = inputRowMeta.getValueMeta(fieldnum);
    Object[] outputRow = new Object[5];
    outputRow[0] = rownum;
    // Assign the field id. Note that we need to cast to long to match Kettle's type system.
    outputRow[1] = (long)fieldnum+1;
    //assign the data type name
    outputRow[2] = valueMetaInterface.getTypeDesc();
    //assign the field name
    outputRow[3] = valueMetaInterface.getName();
    //assign a string representation of the field value
    outputRow[4] = inputRowMeta.getString(r, fieldnum);
    //emit a row.
    putRow(data.outputRowMeta, outputRow);
  return true;
The main difference with the original code is the addition of the two new output fields, fieldname and fieldtype. In order to obtain the values for these fields, the loop over the fields first obtains the ValueMetaInterface object for the current field. This is done by calling the getValueMeta() method of the RowMetaInterface object and passing the index of the desired field.
Using the ValueMetaInterface object, the field name is obtained using its getName() method. The data type name is obtained by calling its getTypeDesc() method.

Calling the normalizer as subtransformation

Using the improved normalizer is as simple as adding a Mapping-step to your transformation and pointing it to the transformation that contains the normalizer and Mapping input and output specifications:

Download samples

The transformations discussed in this post are available here: These transformations are in the public domain: you can use, copy, redistribute and modify these transformations as you see fit. You are encouraged but not obliged to share any modifications that you make to these examples.


Anonymous said...

1st: Thank you so much for this awesome article. I'm going to implement this right away.
2nd: FYI the images in this article are giving 403 errors and don't display currently.
Thanks and I'll be checking this blog frequently!

rpbouman said...

Thanks for the kind words, and thanks for the heads-up re the images. I used to use Flickr for those and apparently they stopped supporting direct linking. So now I'm using my google drive for it, but I had to struggle a bit before I could find out to do direct linking from there. Images should be showing now.

Anonymous said...

Also, in order to make it truly generic, check the "Include unspecified fields, ordered by name" check box. That way you don't have to specify any fields in the Mapping Input Specification.
Thanks Roland!

rpbouman said...

"Also, in order to make it truly generic, check the "Include unspecified fields, ordered by name" check box."

Yes, you'd think that.

But for me, it works without checking that. (checked on kettle 5.3) Does it not work for you that way?

Checking the checkbox implies order of the fields will be changed, which I would rather avoid if possible.

rpbouman said...

I mean, even if I don't check that box, I still receive all fields form the incoming stream.

Anonymous said...

You're absolutely correct Sir. I receive all the fields either way.

rpbouman said...

Anonymous, no problem. I was confused to and I had to check it first before deciding to set it up like this.

Of course, it should not be a problem if you do check that box but the fields will then presumably be re-ordered.

Unknown said...

Thanks for excelent job.

Finally made my dynamic excel import with unknown colum count, sequence and name, by combining your normalizer and Metadata Injection step.
It was a real pain before, when customers changed order or added new columns etc in their import spreadsheets. I wonder why generic Microsoft Excel input step doesn't handle it automatically out of the box.


rpbouman said...

Normunds, I'm glad this was of some use to you.

Regarding Excel, It could be argued either way: if the format changes, many people will want to know and not accidentally and silently import or export data they don't know about. But I can feel your pain when you really need it to work dynamicallly.

Best regards


Anonymous said...

Thank you very much!

Koen said...

It looks like there is a small bug: the output data has a column called "fieldname" but actually it holds the "fieldtype" and vice versa...

Anonymous said...

How can i use this mapping step with the ktr developed by using metadata injection,as mapping doesn't support metadata injection

rpbouman said...

Anonymous, sounds like off-topic? Use the kettle API if you don't have metadata injection.

Anonymous said...

Hey Bouman,
I'm using Kettle for sometime(not a java guy), i got a scenario where i get an excel consider i have 10 columns where first 4 columns are fixed and after 4th column i need to unpivot. Can your java class be altered in such a way that i could it could fit my scenario. BTW how do i read the excel data without defining the columns prior. Please help.

rpbouman said...

"Can your java class be altered in such a way that i could it could fit my scenario. BTW how do i read the excel data without defining the columns prior. Please help."

This has been a while, and I'm not entirely up to date on the latest state of affairs re. Kettle. In the mean while, many improvements have been added to metadata injection - please check it out:

Perhaps you can use metadata injection for both of your requirements?


Nowadays, many SQL implementations offer some form of aggregate string concatenation function. Being an aggregate function, it has the effe...