Navigation

2-7-3 Execute a SQL Query for a Nested Array

This example builds on the previous flow to add an array of salary history to the employee JSON.

Since the result of a SQLExecute always goes in the flowfile content and we are executing more than one query in series, we have to move the content from the first query to an attribute with the ExtractText processor. After the second query is executed, we move its content to an attribute, then build a combined document and move it to content with the ReplaceText processor.

Download Template
Original Document:
{ "emp_no": 212208, "birth_date": "1952-10-25", "first_name": "Garnet", "last_name": "Mitina", "gender": "M", "hire_date": "1994-12-15", "dept_no": "d003", "dept_name": "Human Resources", "salary": 61665, "title": "Staff" } New document: { "emp_no": 212208, "birth_date": "1952-10-25", "first_name": "Garnet", "last_name": "Mitina", "gender": "M", "hire_date": "1994-12-15", "dept_no": "d003", "dept_name": "Human Resources", "salary": 61665, "title": "Staff", "salaryHistory": [ { "salary": 51128, "from_date": "1994-12-15", "to_date": "1995-12-15" }, { "salary": 53798, "from_date": "1995-12-15", "to_date": "1996-12-14" }, { "salary": 56813, "from_date": "1996-12-14", "to_date": "1997-12-14" }, { "salary": 57437, "from_date": "1997-12-14", "to_date": "1998-12-14" }, { "salary": 60751, "from_date": "1998-12-14", "to_date": "1999-12-14" }, { "salary": 60449, "from_date": "1999-12-14", "to_date": "2000-12-13" }, { "salary": 61541, "from_date": "2000-12-13", "to_date": "2001-12-13" }, { "salary": 61665, "from_date": "2001-12-13", "to_date": "9999-01-01" } ] }
Processors: (first 5 from previous exercise)
  • GenerateFlowFile ("Start") – starts the flow
    • Settings
      • Automatically Terminate Relationships: failure
    • Scheduling
      • Run Schedule: 1000 days (prevents infinitely looping)
  • UpdateAttribute ("Set SQL")
    • Properties
      • sql.select.count: SELECT COUNT(*) as count
        Note: ExecuteSQL will error on "SELECT COUNT(*)" without the "as count"
      • sql.select:
        SELECT
        employees.emp_no, employees.birth_date, employees.first_name, employees.last_name,
        employees.gender, employees.hire_date, employees.dept_emp.dept_no, employees.departments.dept_name,
        employees.salaries.salary, employees.titles.title
      • sql.from.where:
        FROM
        employees.employees, employees.dept_emp, employees.departments, employees.salaries, employees.titles
        WHERE
        employees.employees.emp_no = employees.dept_emp.emp_no AND
        employees.dept_emp.to_date > CURDATE() AND
        employees.dept_emp.dept_no = employees.departments.dept_no AND
        employees.employees.emp_no = employees.salaries.emp_no AND
        employees.salaries.to_date > CURDATE() AND
        employees.employees.emp_no = employees.titles.emp_no AND
        employees.titles.to_date > CURDATE()
      • sql.page.size: 10000
  • ExecuteSQL ("Count Rows")
    • Properties
      • Database Connection Pooling Service: DBCPConnectionPool
      • SQL select query: ${sql.select.count} ${sql.from.where}
  • ConvertAvroToJson
    • Properties
      • (all default)
    • Settings
      • Automatically Terminate Relationships: failure
  • EvaluateJsonPath ("Get Count from JSON")
    • Properties
      • Destination: flowfile-attribute
      • sql.result.count: $.count (custom property)
    • Settings
      • Automatically Terminate Relationships: failure, unmatched
  • ExtractText ("Move Content to Attribute")
    • Properties
      • sql.result: ^(.*)$ (custom property)
    • Settings
      • Automatically Terminate Relationships: unmatched
  • ExecuteSQL – executes the query for salary history
    • Properties
      • Database Connection Pooling Service: DBCPConnectionPool
      • SQL select query:

      SELECT employees.salaries.salary, employees.salaries.from_date, employees.salaries.to_date
      FROM employees.salaries
      WHERE employees.salaries.emp_no = ${emp.no}
      ORDER BY employees.salaries.from_date
  • ConvertAvroToJson
    • Properties
      • (all default)
    • Settings
      • Automatically Terminate Relationships: failure
  • ExtractText ("Move Content to Attribute")
    • Properties
      • Enable DOTALL Mode: true
      • salary.history: ^(.*)$ (custom property)
    • Settings
      • Automatically Terminate Relationships: unmatched
  • ReplaceText ("Combine Attributes as JSON Content")
    • Properties
      • Search Value: ^(.*)$
      • Replace Value:
        • ${ sql.result:substringBeforeLast('}') },
          "salaryHistory":${salary.history}
          }
    • Settings
      • Automatically Terminate Relationships: failure
  • UpdateAttribute
    • Properties
      • marklogic.uri: /employees/${emp.no}.json
  • InvokeHTTP – HTTP PUT to MarkLogic REST API /LATEST/documents
    • Properties
      • HTTP Method: PUT
      • Remote URL: http://localhost:8000/LATEST/documents?uri=${marklogic.uri}
      • Basic Authentication Username: youruser
      • Basic Authentication Password: yourpassword
    • Settings
      • Check all five checkboxes under "Automatically Terminate Relationships"