Wednesday, March 25, 2015

A Generic Normalizer for Pentaho Data integration - Revisited

A while ago, I wrote about how to create a generic normalizer for Pentaho Data integration.

To freshen up your memory, the generic normalizer takes any input stream, and for each input row, it outputs one row for each field in the input stream. The output rows contain fields for input row number, input field number and input field value. As such it provides the same functionality as the built-in Row Normaliser step without requiring any configuration, thus allowing it to process arbitrary streams of data.

A reusable normalizer

Recently I received an comment asking for more information on how to make this normalizer more reusable:
I want to use this method to do field level auditing but I want to encapsulate it in a sub transformation to which I can pass the result rows from any step. In your image of "how all these steps work together", instead of a data grid, the input would need to be dynamic in terms of the number of fields/columns and the data types. Could you possibly provide a hint how to make the input to these steps (in your example, the datagrid) dynamic?
In the mean while, I learned a thing or two about kettle's internals and it seemed like a good idea to describe how to improve on the original example and make it suitable to be used in a so-called Mapping, a.k.a. a sub-transformation.

Design for use as Subtransformation

The design for the re-usable generic normalizer is shown below:
The User-defined Java class step in the middle actually implements the normalizer. The Mapping input and Mapping output specification steps allow the normalizer to be called from another transformation. They enable it to respectively receive input data from, and return output data to the calling transformation.

In the screenshot above, the configuration dialogs for both the Mapping input and output specification steps are shown. This is mainly to show that there is no configuration involved: the Mapping input specification step will faithfully pass all fields received from the incoming stream on to the normalizer, and the Mapping output specification will output all fields coming out of the normalizer to the outgoing stream.

Normalizer Improvements

The configuration of the user-defined Java class step differs in a number of aspects from what I used in the original normalizer example. In the original example the normalizer output consisted of three fields:
A sequential integer number identifying the position of the row to which the current output row applies.
A sequential integer number identifying the position of the field to which the current output row applies.
A string representation of the value to which the output applies
The original example used a Metadata Structure of Stream step to obtain metadata of the input stream, and this metadata was then tied to the output of the normalizer using a Stream Lookup step, "joining" the output of the Metadata Structure step with the output of the normalizer using the field number.
The improved generic normalizer adds two more output fields:
The name of the field as it appears in the input stream
The name of the data type of the field as it appears in the input stream
Argueably, these two items are the most important pieces of metadata that were previously provided by the Metadata Structure of Stream in the original example, and I felt many people would probably prefer to have all of that work done by the normalizer itself rather than having to tie all the pieces together in the transformation itself using additional steps.


The code for the user-defined Java class is shown below:
static long rownum = 0;
static RowMetaInterface inputRowMeta;
static long numFields;
static String[] fieldNames;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
  // get the current row
  Object[] r = getRow();

  // If the row object is null, we are done processing.
  if (r == null) {
    return false;

  // If this is the first row, cache some metadata.
  // We will reuse this metadata in processing the rest of the rows.
  if (first) {
    inputRowMeta = getInputRowMeta();
    numFields = inputRowMeta.size(); 
  // Generate a new id number for the current row.
  rownum += 1;

  // Generate one output row for each field in the input stream.
  int fieldnum;
  ValueMetaInterface valueMetaInterface;
  for (fieldnum = 0; fieldnum < numFields; fieldnum++) {
    //get metadata for the current field
    valueMetaInterface = inputRowMeta.getValueMeta(fieldnum);
    Object[] outputRow = new Object[5];
    outputRow[0] = rownum;
    // Assign the field id. Note that we need to cast to long to match Kettle's type system.
    outputRow[1] = (long)fieldnum+1;
    //assign the data type name
    outputRow[2] = valueMetaInterface.getTypeDesc();
    //assign the field name
    outputRow[3] = valueMetaInterface.getName();
    //assign a string representation of the field value
    outputRow[4] = inputRowMeta.getString(r, fieldnum);
    //emit a row.
    putRow(data.outputRowMeta, outputRow);
  return true;
The main difference with the original code is the addition of the two new output fields, fieldname and fieldtype. In order to obtain the values for these fields, the loop over the fields first obtains the ValueMetaInterface object for the current field. This is done by calling the getValueMeta() method of the RowMetaInterface object and passing the index of the desired field.
Using the ValueMetaInterface object, the field name is obtained using its getName() method. The data type name is obtained by calling its getTypeDesc() method.

Calling the normalizer as subtransformation

Using the improved normalizer is as simple as adding a Mapping-step to your transformation and pointing it to the transformation that contains the normalizer and Mapping input and output specifications:

Download samples

The transformations discussed in this post are available here: These transformations are in the public domain: you can use, copy, redistribute and modify these transformations as you see fit. You are encouraged but not obliged to share any modifications that you make to these examples.

Year-to-Date on Synapse Analytics 5: Using Window Functions

For one of our Just-BI customers we implemented a Year-to-Date calculation in a Azure Synapse Backend. We encountered a couple of approache...