Navigation

Run Data Hub-4 Flows (Input and Harmonize)

This demo template demonstrates how to orchestrate input and Harmonize flow of DHF in single NiFi template

The example DHF Tutorial can be found at: Data Hub Framework Tutorial 4.X This template will follow the Product Entity example from DHF 4.X tutorial.

You can download the NiFi template here.

DHF version : 4.3.1

NAR version : MarkLogic NiFi 1.8.0.1

Apache NiFi version : 1.8.0

Input Data

The input data is a CSV file. Looking at MLCP command for the input flow, we can derive important details

MLCP import command

These are the parameters that map to properties in PutMarkLogic:

            -output_collections "Product,LoadProducts,input"
            -transform_module "/data-hub/4/transforms/mlcp-flow-transform.sjs"
            -transform_param "entity-name=Product,flow-name=Load Products"
          

Because the transform module is SJS, the Server Transform property value is ml:sjsInputFlow. If it were XQuery, we would use ml:inputFlow.

The transform parameters are given as separate custom properties prefixed with trans:. (see below under PutMarkLogic)

Processors

GetFile

Reads files from a watched directory

Properties

Input Directory
/path/to//data-hub/input/products/games
Keep Source File
true

Scheduling

Run Schedule
10000 days

InferAvroSchema

Examines the contents of the incoming data to infer an Avro schema

Properties

Schema Output Destination
flowfile-attribute
Input Content Type
csv
Get CSV Header Definition From Data
true
Avro Record Name
MyCSV

Settings

Automatically Terminate Relationships
failure, original, unsupported content

UpdateAttribute

Note : It is possible that the inferred schema isn't a true representation of your data (It will infer the schema based on a subset of the data, so if data isn't very consistent then it is likely that It will misinterpret what the field types are and hit errors during converting).

UpdateAttribute processor is used here to provide the schema details manually and update “inferred.avro.schema” attribute.

Properties

Store state
Do not store state
inferred.avro.schema
JSON Schema

ConvertCSVToAvro

Properties

Record Schema
${inferred.avro.schema}

Settings

Automatically Terminate Relationships
failure, incompatible

SplitAvro

Properties

(all default)

Settings

Automatically Terminate Relationships
failure, original

ConvertAvroToJson

Properties

(all default)

Settings

Automatically Terminate Relationships
failure

PutMarkLogic

Properties

DatabaseClient Service
(ML-DHF-InputFlow, pointing to data-hub-STAGING database and the corresponding HTTP port)
Collections
Product,LoadProducts,input
Server transform
ml:sjsInputFlow
URI attribute name
uuid
URI suffix
.json
trans:entity-name (custom property)
Product
trans:flow-name (custom property)
Load Products

Relationships

Batch_success
All successful URIs in a batch passed comma-separated in URIs FlowFile attribute

ExtensionCallMarkLogic

Properties

Database Client Service
(ML-DHF-HarmonizeFlow, pointing to data-hub-STAGING database and the corresponding HTTP port)
Extension Name
ml:sjsFlow
Requires Input
true
param:entity-name
Product
param:flow-name
Harmonize Products
param:identifiers
${URIs}
param:job-id
12345678
param:target-database
data-hub-FINAL
separator:param:identifiers
,

Run Processors

Select each of the processors and start them.

Template

MLCP import command