Run Data Hub-4 Flows (Input and Harmonize)
This demo template demonstrates how to orchestrate input and Harmonize flow of DHF in single NiFi template
The example DHF Tutorial can be found at: Data Hub Framework Tutorial 4.X This template will follow the Product Entity example from DHF 4.X tutorial.
You can download the NiFi template here.
DHF version : 4.3.1
NAR version : MarkLogic NiFi 1.8.0.1
Apache NiFi version : 1.8.0
Input Data
The input data is a CSV file. Looking at MLCP command for the input flow, we can derive important details
These are the parameters that map to properties in PutMarkLogic:
-output_collections "Product,LoadProducts,input"
-transform_module "/data-hub/4/transforms/mlcp-flow-transform.sjs"
-transform_param "entity-name=Product,flow-name=Load Products"
Because the transform module is SJS, the Server Transform
property value is ml:sjsInputFlow
. If it were XQuery, we would use ml:inputFlow
.
The transform parameters are given as separate custom properties prefixed with trans:
. (see below under PutMarkLogic)
Processors
GetFile
Reads files from a watched directory
Properties
- Input Directory
- /path/to//data-hub/input/products/games
- Keep Source File
- true
Scheduling
- Run Schedule
- 10000 days
InferAvroSchema
Examines the contents of the incoming data to infer an Avro schema
Properties
- Schema Output Destination
- flowfile-attribute
- Input Content Type
- csv
- Get CSV Header Definition From Data
- true
- Avro Record Name
- MyCSV
Settings
- Automatically Terminate Relationships
- failure, original, unsupported content
UpdateAttribute
Note : It is possible that the inferred schema isn't a true representation of your data (It will infer the schema based on a subset of the data, so if data isn't very consistent then it is likely that It will misinterpret what the field types are and hit errors during converting).
UpdateAttribute processor is used here to provide the schema details manually and update “inferred.avro.schema” attribute.
Properties
- Store state
- Do not store state
- inferred.avro.schema
- JSON Schema
ConvertCSVToAvro
Properties
- Record Schema
- ${inferred.avro.schema}
Settings
- Automatically Terminate Relationships
- failure, incompatible
SplitAvro
Properties
(all default)
Settings
- Automatically Terminate Relationships
- failure, original
ConvertAvroToJson
Properties
(all default)
Settings
- Automatically Terminate Relationships
- failure
PutMarkLogic
Properties
- DatabaseClient Service
- (ML-DHF-InputFlow, pointing to data-hub-STAGING database and the corresponding HTTP port)
- Collections
- Product,LoadProducts,input
- Server transform
- ml:sjsInputFlow
- URI attribute name
- uuid
- URI suffix
- .json
- trans:entity-name (custom property)
- Product
- trans:flow-name (custom property)
- Load Products
Relationships
- Batch_success
- All successful URIs in a batch passed comma-separated in URIs FlowFile attribute
ExtensionCallMarkLogic
Properties
- Database Client Service
- (ML-DHF-HarmonizeFlow, pointing to data-hub-STAGING database and the corresponding HTTP port)
- Extension Name
- ml:sjsFlow
- Requires Input
- true
- param:entity-name
- Product
- param:flow-name
- Harmonize Products
- param:identifiers
- ${URIs}
- param:job-id
- 12345678
- param:target-database
- data-hub-FINAL
- separator:param:identifiers
- ,
Run Processors
Select each of the processors and start them.