Navigation

4-2 Kafka Integration

Downloads
Setup Instructions
  • Download and Install Kafka
  • Keep ZooKeeper and Kafka Server running
  • Create two queues for the demo and test it
    • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bankInQue
    • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bankOutQue
    • bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankInQue --from-beginning
    • bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bankOutQue --from-beginning
  • You can kill all sessions except keep ZooKeeper and Kafka Server running
  • Create a Consumer Group
    • Compile the java class
      • javac -cp '……. /Apache_kafka/kafka_2.11-2.0.0/libs/*' ConsumerGroup.java
    • Run the consumerGroup.class
      • java -cp '/Users/mthahir/WorkSoftware/Apache_kafka/kafka_2.11-2.0.0/libs/*':. \
      • ConsumerGroup bankInQue bankInQueGroup
      • Note: There are several ways to create a consumer group. Make sure You can kill the java job above. We want only one consumer with the group and will be used later in ConsumeKakfa process
  • MarkLogic to Kafka
    • Load the sample JSON file to MarkLogic. Give a collection name and note the collection name.
    • Import the template “MarkLogic_to_Kafka” to NiFi
    • Set the password, database name, port, etc. for QueryMarkLogic processor
    • Check the query and make sure the collection name matches
    • On PutKafka make sure the queue name matches
    • Open a consumer for the queue to see the results
    • Run the flow and you will see on the consumer window information retrieved from MarkLogic
  • Kafka to MarkLogic
    • Import the Kafka_to_MarkLogic template
    • On ConsumeKafka processor check the queue name “bankInQue” and group-id “bankInQueGroup”
    • Configure the database connection controller service for PutMarkLogic
    • Open a Kafka Produce process:
      • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic bankInQue
    • While the NiFi process running, enter the following code on Produce window:
      • 185430584,5,256-44-6366052.
      • Note: You are essentially putting some data to the queue to be processed. You can also open a consume window for the queue bankInQue to see results
    • After completion, there should be a JSON document added to the MarkLogic database