Navigation

2-7-2 Count Rows, Construct Paged SQL SELECTs

This example handles the paging within the flow itself. The first 5 processors do a SELECT COUNT(*) and extract the count from the result. Next, a RouteOnAttribute/UpdateAttribute loop creates the offset values that are used to construct the paged SQL SELECTs. (In MySQL, these are of the form: "LIMIT x OFFSET y".) These paged SELECTs statements are routed to ExecuteSQL with the rest of the flow handling the resultset split, conversion to JSON, extraction of ID value, URI construction and insertion into MarkLogic.

Download Template

Processors:

  • GenerateFlowFile ("Start") – starts the flow
    • Settings
      • Automatically Terminate Relationships: failure
    • Scheduling
      • Run Schedule: 1000 days (prevents infinitely looping)
  • UpdateAttribute ("Set SQL")
    • Properties
      • sql.select.count: SELECT COUNT(*) as count
        Note: ExecuteSQL will error on "SELECT COUNT(*)" without the "as count"
      • sql.select:
        SELECT
        employees.emp_no, employees.birth_date, employees.first_name, employees.last_name,
        employees.gender, employees.hire_date, employees.dept_emp.dept_no, employees.departments.dept_name,
        employees.salaries.salary, employees.titles.title
      • sql.from.where:
        FROM
        employees.employees, employees.dept_emp, employees.departments, employees.salaries, employees.titles
        WHERE
        employees.employees.emp_no = employees.dept_emp.emp_no AND
        employees.dept_emp.to_date > CURDATE() AND
        employees.dept_emp.dept_no = employees.departments.dept_no AND
        employees.employees.emp_no = employees.salaries.emp_no AND
        employees.salaries.to_date > CURDATE() AND
        employees.employees.emp_no = employees.titles.emp_no AND
        employees.titles.to_date > CURDATE()
      • sql.page.size: 10000
  • ExecuteSQL ("Count Rows")
    • Properties
      • Database Connection Pooling Service: DBCPConnectionPool
      • SQL select query: ${sql.select.count} ${sql.from.where}
  • ConvertAvroToJson
    • Properties
      • (all default)
    • Settings
      • Automatically Terminate Relationships: failure
  • EvaluateJsonPath ("Get Count from JSON")
    • Properties
      • Destination: flowfile-attribute
      • sql.result.count: $.count (custom property)
    • Settings
      • Automatically Terminate Relationships: failure, unmatched
  • RouteOnAttribute ("Continue if More")
    • Properties
      • continue: ${sql.result.count:gt(0):and(${counter:le(${sql.result.count})})}
    • Settings
      • Automatically Terminate Relationships: unmatched
  • UpdateAttribute ("Increment Counter")
    • Properties
      • counter: ${counter:plus(${sql.page.size})}
  • UpdateAttribute ("Construct Paged SQL SELECT")
    • Properties
      • sql.select.paged:
      • ${sql.select} ${sql.from.where}
        LIMIT ${sql.page.size}
        OFFSET ${counter}
  • ExecuteSQL
    • Properties
      • Database Connection Pooling Service: DBCPConnectionPool
      • SQL select query: ${sql.select.paged}
  • SplitAvro
    • Properties
      • (all default)
    • Settings
      • Automatically Terminate Relationships: failure, original
  • ConvertAvroToJson
    • Properties
      • (all default)
    • Settings
      • Automatically Terminate Relationships: failure
  • EvaluateJsonPath - Store values from JSON in FlowFile properties
    • Properties
      • Destination: flowfile-attribute
      • emp.no: $.emp_no (custom property)
    • Settings
      • Automatically Terminate Relationships: failure, unmatched
  • UpdateAttribute
    • Properties
      • marklogic.uri: /employees/${emp.no}.json
  • InvokeHTTP – HTTP PUT to MarkLogic REST API /LATEST/documents
    • Properties
      • HTTP Method: PUT
      • Remote URL: http://localhost:8000/LATEST/documents?uri=${marklogic.uri}
      • Basic Authentication Username: youruser
      • Basic Authentication Password: yourpassword
    • Settings
      • Check all five checkboxes under "Automatically Terminate Relationships"