Tuesday, March 20, 2012

A Generic Normalizer step for Kettle

UPDATE: if you're interested in this article you might be interested in the follow-up article as well: A Generic Normalizer for Pentaho Data integration - Revisited.


Kettle (a.k.a. Pentaho Data Integration) offers the standard Row Normalizer step to "unpivot" columns to rows. However, this step requires some configuration which presumes its input stream is static, and its structure is known. In this post, I explain how to construct a simple User-defined java class step that implements a generic Row Normalizer step that can unpivot an arbitrary input stream without manual configuration.

The Row Normalizer step

Kettle (a.k.a. Pentaho Data Integration) offers a standard step to "unpivot" columns to rows. This step is called the Row Normalizer. You can see it in action in the screenshot below:

In the screenshot, the input is a table of columns id, first name, and last name. The output is a table of columns id, fieldname, and value. The id column is preserved, but for each row coming from the input stream, two rows are created in the output stream: 1 for the first name field, and 1 for the last name field.

Essentially the Row Normalizer step in this example is configured to treat the first name and last name fields as a repeating group. The repeating group is untangled by dumping all values for either field in the value column. The fieldname column is used to mark the kind of value: some values are of the "first name field" kind (in case they came from the original first name input field), some are from the "last name field" kind (when the derive from the last name input field).

There are several use cases for the operation performed by the Row normalizer step. It could be used to break down a genuine repeating group in order to create a more normalized dataset. Or you might need to convert a relational dataset into a graph consisting of subject-predicate-object tuples for loading a triple store. Or maybe you want to turn a table into a fine-grained stream of changes for auditing.

The problem

The Row normalizer step works great for streams that have a structure that is known in advance. The structure needs to be known in advance in order to specify those fields that are to be considered as repeating group in the step configuration so they can be broken out into separate kinds.

Sometimes, you don't know the structure of the input stream in advance, or it is just to inconvenient to manually specify it. In these cases, you'd really wish you could somehow unpivot any field that happens to be part of the input stream. In other words, you'd need to have a generic Row Normalizer step.

The Solution

In Kettle, there's always a solution, and often more. Here, I'd like to present a solution to dynamically unpivot an arbitrary input stream using a user-defined java class step.

Below is a screenshot of the step configuration:

This configuration allows the step to take an arbitrary input stream and normalize it into a collection of triples consisting of:
  1. An id column. This column delivers generated integer values, and each distinct value uniquely identifies a single input row.
  2. A fieldnum column. This is a generated integer value that uniquely identifies a field within each input row.
  3. A value column. This is a string column that contains the value that appears in the field identified by the fieldnum column within the row identified by the rownum value.

The Row Normalizer versus the UJDC generic Normalizer

For the input data set mentioned in the initial example, the output generated by this UJDC step is shown below:
There are a few differences with regard to the output of kettle's Row Normalizer step:
  1. One obvious difference is that the Row Normalizer step has the ability to attach names to the values, whereas the UJDC step only delivers a generated field position. One the one hand, it's really nice to have field names. On the other hand, this is also one of the weaknesses of the Row Normalizer step, because providing the names most be done manually.
  2. Another difference is that the UDJC step delivers 3 output rows for each input row, instead of the 2 rows delivered by the Row Normalizer step. The "extra" row is due to the id column. Because the id column is the key of the original input data, the Row Normalizer step was configured to only unpivot the first name and last name fields, keeping the id field unscathed: this allows any downstream steps to see which fields belong to which row. The UDJC step however does not know which field or fields form the key of the input stream. Instead, it generates its own key, the rownum field, and the id field is simply treated like any other field and unpivoted, just like the first name and last name fields. So the main difference is that the downstream steps need to use the generated rownum field to see which fields belong to which row.

The Code

The code and comments are pretty straightforward:
static long rownum = 0;
static RowMetaInterface inputRowMeta;
static long numFields;
static String[] fieldNames;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
// get the current row
Object[] r = getRow();

// If the row object is null, we are done processing.
if (r == null) {
return false;

// If this is the first row, cache some metadata.
// We will reuse this metadata in processing the rest of the rows.
if (first) {
inputRowMeta = getInputRowMeta();
fieldNames = inputRowMeta.getFieldNames();
numFields = fieldNames.length;

// Generate a new id number for the current row.
rownum += 1;

// Generate one output row for each field in the input stream.
int fieldnum;
for (fieldnum = 0; fieldnum < numFields; fieldnum++) {
Object[] outputRow = new Object[3];
outputRow[0] = rownum;
// Assign the field id. Note that we need to cast to long to match Kettle's type system.
outputRow[1] = (long)fieldnum+1;
outputRow[2] = inputRowMeta.getString(r, fieldnum);
putRow(data.outputRowMeta, outputRow);

return true;

Getting Field information

So the UDJC step only generates a number to identify the field. For some purposes it may be useful to pull in other information about the fields, like their name, data type or data format. While we could do this also directly int the UDJC step by writing more java code, it is easier and more flexible to use some of Kettle's built-in steps:
  1. The Get Metadata Structure step. This step takes an input stream, and generates one row for each distinct field. Each of these rows has a number of columns that describe the field from the input stream. One of the fields is a Position field, which uniquely identifies each field from the input stream using a generated integer, just like the fieldnum field from our UJDC step does.
  2. The stream lookup step. This step allows us to combine the output stream of our UJDC step with the output of the Get Metadata structure step. By matching the Position field of the Get Metadata Structure step with the fieldnum field of the UDJC step, we can lookup any metadata fields that we happen to find useful.

Below is a screenshot that shows how all these steps work together:
And here endeth the lesson.

Running Pentaho BI Server behind a proxy

To whom it may concern - a quick hands-on guide for running the Pentaho BI server behind a proxy


This post assumes you're running Ubuntu linux (or at least a Debian) and that you have both the apache Httpd server as well as the Pentaho BI server installed.

Apache HTTP Server

If you haven't got apache installed, this is your line:
$ sudo apt-get install apache2

You can then control the apache2 Http server using the apaceh2ctl script. For instance, to start it, do:
$ sudo apache2ctl start

Once it's started you can navigate to its homepage to verify that it is running:

You can stop it by running
$ sudo apache2ctl stop

If you're changing apache's configuration, you need to restart it to allow changes to take effect using this command:
$ sudo apache2ctl restart


Pentaho relies on Java. If not installed already you can get it like this:
$ sudo apt-get install openjdk-6-jdk

Pentaho BI Server

If you haven't got the Pentaho BI Server, download the latest version from sourceforge, and unpack the archive in some location you find convenient. (For development purposes I simply keep and run it in a subdirectory of my home directory)

You can start the pentaho BI Server by cd-ing into the biserver-ce directory and then run:
$ ./start-pentaho.sh

You can then navigate to its homepage:

(Simply navigating to http://localhost:8080 will automatically redirect you there too).

It can be useful to monitor the log while it's running:
$ tail -f tomcat/logs/catalina.out

If you want to change something in Pentaho's configuration, you need to stop the server and then restart it. This is done by running:
$ ./stop-pentaho.sh

Configuring Proxy support for Apache

Boris Kuzmanovic wrote an excellent post to setting up proxy support for Apache. My summary (and adjustment) follows below.

First, change the apache configuration to load the required proxy modules:
$ sudo a2enmod proxy
$ sudo a2enmod proxy_http

Then, edit any site definitions to use the proxy. I just modified the default site definition:
$ sudo geany /etc/apache2/sites-enabled/000-default

Inside the <VirtualHost> section, I added these snippets immediately above the </VirtualHost> that ends the section:

<Location /pentaho/>
ProxyPass http://localhost:8080/pentaho/
ProxyPassReverse http://localhost:8080/pentaho/
SetEnv proxy-chain-auth

<Location /pentaho-style/>
ProxyPass http://localhost:8080/pentaho-style/
ProxyPassReverse http://localhost:8080/pentaho-style/
SetEnv proxy-chain-auth

After making these changes, we need to restart apache:
$ sudo apache2ctl restart

These two <Location> directives are now effectively tunneled to the respective locations on the Pentaho BI Server, and vice versa, the response is passed back.

Using mod_proxy_ajp instead of proxy_http

While the regular HTTP proxy simply works, there is a better, more thightly integrated solution. The regular HTTP proxy basically handles HTTP requests received by the Apache Httpd server by sending a new, equivalent HTTP request, through to the tomcat server. Likewise, Tomcat's HTTP response is then send back as a new equivalent HTTP response to the source of the original, initial request.

So, that's twice a transport over HTTP.

Things can be improved by routing the incoming HTTP request to the tomcat server using a binary protocol called the AJP (Apache JServ) protocol. (For a detailed comparison, see this excellent comparison between HTTP/HTTPS and AJP.)

Fortunately, the steps to setup an AJP proxy are almost identical to those for setting up a regular HTTP proxy. First, enable the ajp proxy module:
$ sudo a2enmod proxy
$ sudo a2enmod proxy_ajp

(Note that the proxy module was already enabled as part of setting up the regular http proxy. The line is repeated here for completeness, but not necessary if you completed the steps for setting up support for the regular http proxy. You can enable either or both the proxy_http and the proxy_ajp modules, and both require the proxy module.)

Then, we edit again the site configuration to use the proxy. Since the locations /pentaho/ and /pentaho-style/ were already used, we first comment those out:

#<Location /pentaho/>
# ProxyPass http://localhost:8080/pentaho/
# ProxyPassReverse http://localhost:8080/pentaho/
# SetEnv proxy-chain-auth

#<Location /pentaho-style/>
# ProxyPass http://localhost:8080/pentaho-style/
# ProxyPassReverse http://localhost:8080/pentaho-style/
# SetEnv proxy-chain-auth

Then we add equivalent lines going via the AJP proxy:

ProxyPass /pentaho ajp://localhost:8009/pentaho
ProxyPassReverse /pentaho ajp://localhost:8009/pentaho

ProxyPass /pentaho-style ajp://localhost:8009/pentaho-style
ProxyPassReverse /pentaho-style ajp://localhost:8009/pentaho-style

(The bit that goes ajp://localhost:8009 refers to the ajp service that is running on port 8009 of tomcat by default.)

Again we have to restart the apache service for the changes to take effect:
$ sudo apache2ctl restart


Thanks to Paul Stöllberger, Pedro Alves and Tom Barber for valuable feedback and background information regarding AJP.


Nowadays, many SQL implementations offer some form of aggregate string concatenation function. Being an aggregate function, it has the effe...