Sunday 29 April 2018

How to use Kafka consumer in pentaho 8


1.Create  main and sub transformation as discussed below
2.call sub transformation from main Transformation

Note:-Sub transformation required for Kafka consumer step




Download working sample from here
https://drive.google.com/open?id=1Z4C2miczU0BnB4n3r1LcpN78v2UjefWQ




In the kaka transformation,

1.We are using direct bootstrap server on connection. 
2. we added the consumer group "test-consumer-group1" change consumer group after every run to retrieve Kafka message from start.
Important:-if you not change consumer group, kafka will not retrieve any message unless any new message arrived to topic.
like test-consumer-group1,test-consumer-group2,test-consumer-group3 .....
3. Changed the auto.offset.reset to "earliest" on options tab.

In the sub transformation.

In "Get records from stream" step, we gave the below fields Fieldname Type key None
message None
topic None
partition None
offset None
timestamp Timestamp

Kafka Version: kafka_2.11-1.1.0

I uploaded a sample .ktr that works







References:-https://help.pentaho.com/Documentation/8.0/Products/Data_Integration/Transformation_Step_Reference/Kafka_Consumer

Saturday 28 April 2018

Load data to Kafka from pentaho




Set Up Kafka and use from pentaho


In this tutorial, I will show how to set up and run Apache Kafka on Windows/linux and how to load/read data from pentaho. 
Kafka comes with two sets of scripts to run Kafka. In the bin folder, the sh files are used to set up Kafka in a Linux environment. In the bin\windows folder, there are also some bat files corresponds to those sh files which are supposed to work in a Windows environment.  Some say you can use Cygwin to execute the sh scripts in order to run Kafka. However, they are many additional steps involved, and in the end you may not get the desired outcome. With the correct bat files, there is no need to use Cygwin, and only Server JRE is required to run Kafka on Windows.

Step 0: Preparation

Install Java 8 SE Server JRE/JDK

You need Java SE Server JRE in order to run Kafka. If you have JDK installed, you already have Server JRE installed, just check if the folder {JRE_PATH}\bin\server exists. If it is not, follow the following steps to install Java SE Server JRE:
  1. Unpack it to a folder, for example C:\Java.
  2. Update the system environment variable PATH to include C:\Java\jre\bin, follow this guide provided by Java.

Download Kafka

  1. Download the binaries from http://kafka.apache.org/downloads.html
  2. Unpack it to a folder, for example C:\kafka

Step 1: Update files and configurations

Update Kafka configuration files

The config files need to be updated corresponding to Windows path naming convention.
Change this path if you using Windows
*** Important :- create this path inside Kafka root directory,other wise Kafka server may not start
  1. Open config\server.properties, change







server.properties
1
log.dirs=/tmp/kafka-logs

to







server.properties
1
log.dirs=c:/kafka/kafka-logs

2.. Open config\zookeeper.properties, change







zookeeper.proerties
1
dataDir=/tmp/zookeeper

to







zookeeper.properties
1
dataDir=c:/kafka/zookeeper-data

Step 2: Start the Server

In Windows Command Prompt, switch the current working directory to C:\kafka:







1
cd C:\kafka

  1. Start Zookeeper
  2. you can create bat file to start Zooker (optional)  kafka-server-start.bat and put below conteent      kafka-server-start.bat {base folder}\kafkaBinary\kafka_2.11-1.1.0\config\server.properties
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with Kafka to get a quick-and-dirty single-node ZooKeeper instance.













  1. you can create bat file to start Kafka (optional)    kafka-server-start.bat and put below conteent      zookeeper-server-start.bat {base folder}\kafka_2.11-1.1.0\config\zookeeper.properties








1
> .\bin\windows\kafka-server-start.bat .\config\server.properties

Step 3: Create a topic

  1. Create a topic
Let’s create a topic named “test” with a single partition and only one replica:







1
> .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.. List topics
We can now see that topic if we run the list topic command:







1
> .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.
  1. Start console producer







1
> .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

2.. Write some messages







1
2
This is a message
This is another message

Step 5: Start a consumer








1
> .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
Yay cheers!

Common Errors and Solutions

classpath is empty. please build the project first e.g. by running 'gradlew jarall'

  • Note: Do not download a source files from appache kafka, download a binary file

Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel

Zookeeper may not configured propery (may be configured with different port number)

References
https://kafka.apache.org/

Kafka tool installation:

This is GUI interface of kafka where you can view messages,check count of messages and create and delete Kafka topic
Down load kafka tool from 
http://www.kafkatool.com/
Now install executable file in your system
This is very useful tool to view data in kafka







Load data to Kafka Topic using pentaho 8.0:

step 0:-if you are not using pentaho 8 you may require to download kafka producer and consumer from pentaho wiki
https://wiki.pentaho.com/display/EAI/Apache+Kafka+Producer
https://wiki.pentaho.com/display/EAI/Apache+Kafka+Consumer

1.start spoon.bat
2.open spoon canvas
3.Read  data from file/database or data grid
4.convert data to valid Json object 
5.Load data to kafka topic (may be conversion to valid json object is required)

Download below example from here
https://drive.google.com/file/d/1Ik0RGTSTKphPrGN4M7rwDLhFDpsDbAsd/view?usp=sharing






while using  pentaho consumer you can use same configration as shown in above example

Important:-you need to change GROUP ID every time if you want to read data from start. 



Some useful Json quary Tips. 



Exapmle


---------------
{ "store": {
    "book": [ 
      { "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      { "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      { "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      { "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ],
    "bicycle": {
      "color": "red",
      "price": 19.95
    }
  }
}

Json Query Result 




XPathJSONPathResult
/store/book/author$.store.book[*].authorthe authors of all books in the store
//author$..authorall authors
/store/*$.store.*all things in store, which are some books and a red bicycle.
/store//price$.store..pricethe price of everything in the store.
//book[3]$..book[2]the third book
//book[last()]$..book[(@.length-1)]
$..book[-1:]
the last book in order.
//book[position()<3]$..book[0,1]
$..book[:2]
the first two books
//book[isbn]$..book[?(@.isbn)]filter all books with isbn number
//book[price<10]$..book[?(@.price<10)]filter all books cheapier than 10
//*$..*all Elements in XML document. All members of JSON structure.

References 
http://www.jsonquerytool.com/
http://goessner.net/articles/JsonPath/