Data Science

What is Apache Flafka? How to use it with Flume for data ingestion [Tutorial]

Apache Kafka is an open-source distributed stream-processing queuing platform, written in Scala and Java. Apache Kafka is used to publishing and subscribe messages in sequential order in the queue. Since Kafka is a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system with higher throughput, reliability and replication characteristics.

In the Apache Kafka Distributed Platform, the Kafka cluster contains one or more servers (Kafka brokers). Producers are processes that publish data (i.e. push messages) into Kafka topics within the broker. A consumer is the one who subscribes data (i.e. pulls messages off from Kafka topic).

On the other hand, Apache Flume is an open source distributed, reliable, and available service for collecting and moving large amounts of data into different file system such as Hadoop Distributed File System (HDFS), HBase, etc. Flume acts as a centralized system service to ingest large volumes of data for streaming logs into several file systems such as HDFS for storage.

The “Flume Agent”, which is responsible for sending messages from the Source (i.e. the source path) to Sink (i.e. the destination path). The agent has the following components,

  • Source: Receives messages from Client or source path and transfers into Channel
  • Sink: It is used for Data Storage. They have different Sinks for Storing data such as HDFS Sink, Hbase Sink, etc
  • Channel: It acts as an intermediate buffer between Source and Sink for passing messages.

Integrating Flume with Kafka

Flume is a data ingestion tool that moves data from one place to another. In Kafka, the Flume is integrated for streaming a high volume of data logs from Source to Destination for Storing data in HDFS.

Deploying Flafka into Production

Using Flume with Kafka:

Kafka and Flume are separate tools. And integration of both is needed to stream the data in Kafka topic with high speed to different Sinks. Here the Flume acts as Consumer and stores in HDFS.

1. Start the Zookeeper server
bin/zkServer.sh start

2. Start the Kafka server
bin/kafka-server-start.sh config/server.properties

3. Here is the command for creating the topic in Kafka
./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic kafkatest

4. Execute command for the producer in the Kafka topic
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic kafkatest

5. Download and install Apache Flume in your machine and start the Apache Flume in your local machine. For example – flume-conf.properties.

Use the Kafka source to stream data in Kafka topics to Hadoop. The Kafka source can be combined with any Flume sink, making it easy to write Kafka data to HDFS, HBase, etc.

The following is the Flume configuration:

a1.sources = r1
a1.sinks = sample
a1.channels = sample-channel

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = sample-channel
a1.sources.r1.topic = file
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = localhost:2181
a1.sources.r1.spoolDir = /tmp/kafka-logs/
a1.sources.r1.basenameHeader=true

# Use a channel which buffers events in memory
a1.channels.sample-channel.type = memory

a1.channels.sample-channel.capacity = 1000
a1.channels.sample-channel.transactionCapacity = 1000
a1.channels.sample-channel.byteCapacityBufferPercentage = 20
a1.channels.sample-channel.byteCapacity = 131072000

# properties of sample-sink
a1.sinks.sample.channel = sample-channel
a1.sinks.sample.type = hdfs
flume1.sinks.sample.writeFormat = Text
#a1.sinks.sample.hdfs.path = hdfs://namenode/flumesource/source1
a1.sinks.sample.hdfs.path = hdfs://localhost:50000/tmp/kafka/%{topic}/%y-%m-%d
a1.sinks.sample.hdfs.useLocalTimeStamp = true
#a1.sinks.sample.hdfs.filePrefix=demo
#a1.sinks.sample.hdfs.fileSuffix=.txt
a1.sinks.sample.rollInterval=0
a1.sinks.sample.hdfs.deletePolicy=immediate
#a1.sinks.sample.hdfs.batchSize =1000
a1.sinks.sample.hdfs.rollSize=131072000
a1.sinks.sample.hdfs.rollCount=0
a1.sinks.sample.hdfs.idleTimeout=0
a1.sinks.sample.hdfs.maxOpenFiles = 10000

6. Start Flume to copy data to store in HDFS Sink
bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties -Dflume.root.logger=DEBUG,console –name a1 -Xmx512m -Xms256m

What are the best practices for Flafka?

As a producer

Use Flume Source to write to Kafka topic.

Here is the configuration file for the Flume with Kafka in order to act as Producer:

a1.sources = r1
a1.sinks = sample
a1.channels = sample-channel
a1.sources.r1.type = exec
a1.sources.r1.command =cat /home/indium/dek.csv
a1.sources.r1.logStdErr = true
a1.channels.sample-channel.type = memory
a1.channels.sample-channel.capacity = 1000
a1.channels.sample-channel.transactionCapacity = 100
a1.sources.r1.channels.selector.type = replicating
a1.sources.r1.channels = sample-channel
a1.sinks.sample.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sample.topic = sample_topic
a1.sinks.sample.brokerList = localhost:9092
a1.sinks.sample.requiredAcks = 1
a1.sinks.sample.batchSize = 20
a1.sinks.sample.channel = sample-channel

As a consumer

Write to Flume Sink from Kafka topic.

We have already seen the configuration for Flume. We have also seen above how to write on the HDFS Sink. Here is the diagram for both Producer and Consumer. And how to integrate Kafka with Flume to publish data to Kafka topic as well as write data to HDFS Storage.

In conclusion

As a best practice to integrate Kafka with Flume for Streaming heavy velocity data, Flafka provides more flexibility for the data pipeline and can achieve distributed ingestion pipeline that, with careful tuning, can ingest more than 1 million events per second.

Leave a Comment

Your email address will not be published.

You may also like

Pin It on Pinterest