Navigation

Retrieve Data From A Relational Source

There are several available processors for working with relational data.

Available Relational Processors

Descriptions for these processors were taken directly from the processor documentation. For our cookbook we will be using GenerateTableFetch and ExecuteSQL together in order to page over the result set.

ExecuteSQL

Executes provided SQL select query. Query result will be converted to Avro format. Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query, and the query may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format. FlowFile attribute ‘executesql.row.count’ indicates how many rows were selected.

GenerateTableFetch

Generates SQL select queries that fetch “pages” of rows from a table. The partition size property, along with the table’s row count, determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, which causes the processor to track the columns’ maximum values, thus only fetching rows whose columns’ values exceed the observed maximums. This processor is intended to be run on the Primary Node only.

This processor can accept incoming connections; the behavior of the processor is different whether incoming connections are provided:

  • If no incoming connection(s) are specified, the processor will generate SQL queries on the specified processor schedule. Expression Language is supported for many fields, but no flow file attributes are available. However the properties will be evaluated using the Variable Registry.
  • If incoming connection(s) are specified and no flow file is available to a processor task, no work will be performed.
  • If incoming connection(s) are specified and a flow file is available to a processor task, the flow file’s attributes may be used in Expression Language for such fields as Table Name and others. However, the Max-Value Columns and Columns to Return fields must be empty or refer to columns that are available in each specified table.

QueryDatabaseTable

Generates a SQL select query, or uses a provided statement, and executes it to fetch all rows whose values in the specified Maximum Value column(s) are larger than the previously-seen maxima. Query result will be converted to Avro format. Expression Language is supported for several properties, but no incoming connections are permitted. The Variable Registry may be used to provide values for any property containing Expression Language. If it is desired to leverage flow file attributes to perform these queries, the GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on a timer or cron expression, using the standard scheduling methods. This processor is intended to be run on the Primary Node only. FlowFile attribute ‘querydbtable.row.count’ indicates how many rows were selected.

NiFi Template Example

You can download a copy of the NiFi template used for this example here.

Setup

To use NiFi with relational you need a relational database and a JDBC driver. This cookbook will be using a dataset stored in MySql. In addition to requiring NiFi and MarkLogic setup (for instructions see Getting Started), you will need the following software to follow along:

Dataset

Our relational cookbook uses the sample employee database located here: https://github.com/datacharmer/test_db

Follow the instructions at the linked dataset repository to load your data.

There are about 300,000 employees in the database, so we will assume that that is too big for ExecuteSQL without paging.

Run the following SQL query, which will create the employee_detail view we will use for extracting data.

USE employees;
            CREATE VIEW `employee_detail` SELECT
            employees.emp_no, employees.birth_date, employees.first_name, employees.last_name,
            employees.gender, employees.hire_date, employees.dept_emp.dept_no, employees.departments.dept_name,
            employees.salaries.salary, employees.titles.title
            FROM
            employees.employees, employees.dept_emp, employees.departments, employees.salaries, employees.titles
            WHERE
            employees.employees.emp_no = employees.dept_emp.emp_no AND
            employees.dept_emp.to_date > CURDATE() AND
            employees.dept_emp.dept_no = employees.departments.dept_no AND
            employees.employees.emp_no = employees.salaries.emp_no AND
            employees.salaries.to_date > CURDATE() AND
            employees.employees.emp_no = employees.titles.emp_no AND
            employees.titles.to_date > CURDATE()
          

And here’s what the results form the employee_detail view will look like: Database results

Template Processor Setup

GenerateTableFetch Settings

Properties

Database Connection Pooling Service
DBCPConnectionPool
Database Type
Generic
Table Name
employee_detail
Columns to Return
*

Settings

Automatically Terminate Relationships: failure

Scheduling

Run Schedule
1 day (prevents infinitely looping)

ExecuteSQL Settings

Properties

Database Connection Pooling Service
DBCPConnectionPool
SQL select query
(leave this blank)

SplitAvro Settings

Properties

(all default)

Settings

Automatically Terminate Relationships
failure, original

ConvertAvroToJson Settings

Properties

(all default)

Settings

Automatically Terminate Relationships
failure

EvaluateJsonPath Settings

Store values from JSON in FlowFile properties

Properties

Destination
flowfile-attribute
emp.no
$.emp_no (custom property)

Settings

Automatically Terminate Relationships
failure, unmatched

UpdateAttribute Settings

Properties

marklogic.uri
/employees/${emp.no}.json

PutMarkLogic Settings

Properties

DatabaseClient Service
DefaultMarkLogicDatabaseClientService
Collections
employees
URI Attribute Name
marklogic.uri

Settings

Check all check boxes under “Automatically Terminate Relationships”

Run Processors

Select each of the processors and start them.