2-7-2 Count Rows, Construct Paged SQL SELECTs
This example handles the paging within the flow itself. The first 5 processors do a SELECT COUNT(*) and extract the count from the result. Next, a RouteOnAttribute/UpdateAttribute loop creates the offset values that are used to construct the paged SQL SELECTs. (In MySQL, these are of the form: "LIMIT x OFFSET y".) These paged SELECTs statements are routed to ExecuteSQL with the rest of the flow handling the resultset split, conversion to JSON, extraction of ID value, URI construction and insertion into MarkLogic.
Download TemplateProcessors:
- GenerateFlowFile ("Start") – starts the flow
- Settings
- Automatically Terminate Relationships: failure
- Scheduling
- Run Schedule: 1000 days (prevents infinitely looping)
- Settings
- UpdateAttribute ("Set SQL")
- Properties
- sql.select.count: SELECT COUNT(*) as count
Note: ExecuteSQL will error on "SELECT COUNT(*)" without the "as count" - sql.select:
SELECT
employees.emp_no, employees.birth_date, employees.first_name, employees.last_name,
employees.gender, employees.hire_date, employees.dept_emp.dept_no, employees.departments.dept_name,
employees.salaries.salary, employees.titles.title - sql.from.where:
FROM
employees.employees, employees.dept_emp, employees.departments, employees.salaries, employees.titles
WHERE
employees.employees.emp_no = employees.dept_emp.emp_no AND
employees.dept_emp.to_date > CURDATE() AND
employees.dept_emp.dept_no = employees.departments.dept_no AND
employees.employees.emp_no = employees.salaries.emp_no AND
employees.salaries.to_date > CURDATE() AND
employees.employees.emp_no = employees.titles.emp_no AND
employees.titles.to_date > CURDATE() - sql.page.size: 10000
- sql.select.count: SELECT COUNT(*) as count
- Properties
- ExecuteSQL ("Count Rows")
- Properties
- Database Connection Pooling Service: DBCPConnectionPool
- SQL select query: ${sql.select.count} ${sql.from.where}
- Properties
- ConvertAvroToJson
- Properties
- (all default)
- Settings
- Automatically Terminate Relationships: failure
- Properties
- EvaluateJsonPath ("Get Count from JSON")
- Properties
- Destination: flowfile-attribute
- sql.result.count: $.count (custom property)
- Settings
- Automatically Terminate Relationships: failure, unmatched
- Properties
- RouteOnAttribute ("Continue if More")
- Properties
- continue: ${sql.result.count:gt(0):and(${counter:le(${sql.result.count})})}
- Settings
- Automatically Terminate Relationships: unmatched
- Properties
- UpdateAttribute ("Increment Counter")
- Properties
- counter: ${counter:plus(${sql.page.size})}
- Properties
- UpdateAttribute ("Construct Paged SQL SELECT")
- Properties
- sql.select.paged:
- ${sql.select} ${sql.from.where}
LIMIT ${sql.page.size}
OFFSET ${counter}
- Properties
- ExecuteSQL
- Properties
- Database Connection Pooling Service: DBCPConnectionPool
- SQL select query: ${sql.select.paged}
- Properties
- SplitAvro
- Properties
- (all default)
- Settings
- Automatically Terminate Relationships: failure, original
- Properties
- ConvertAvroToJson
- Properties
- (all default)
- Settings
- Automatically Terminate Relationships: failure
- Properties
- EvaluateJsonPath - Store values from JSON in FlowFile properties
- Properties
- Destination: flowfile-attribute
- emp.no: $.emp_no (custom property)
- Settings
- Automatically Terminate Relationships: failure, unmatched
- Properties
- UpdateAttribute
- Properties
- marklogic.uri: /employees/${emp.no}.json
- Properties
- InvokeHTTP – HTTP PUT to MarkLogic REST API /LATEST/documents
- Properties
- HTTP Method: PUT
- Remote URL: http://localhost:8000/LATEST/documents?uri=${marklogic.uri}
- Basic Authentication Username: youruser
- Basic Authentication Password: yourpassword
- Settings
- Check all five checkboxes under "Automatically Terminate Relationships"
- Properties