Sunday, 4 January 2015

How to import data from RDBMS to Hadoop and viceversa ?

Hadoop became very popular within few years because of its robust design, open source and ability to handle large data. Nowadays lot of RDBMS to hadoop migration projects are happening. Hadoop is not a replacement for the RDBMS, but for certain usecase, hadoop can perform well than RDBMS. Some projects may require data from rdbms along with multiple sources for finding insights. In these scenarios, we need to transfer data from RDBMS to hadoop environment. This task sounds simple, but this is a difficult task as this involves lot of risk. The possible solutions for importing data from RDBMS to hadoop are explained below

1) Using SQOOP
Sqoop is a hadoop ecosystem component that is developed for importing data from RDBMS to hadoop and for exporting data from hadoop to RDBMS. Sqoop jobs runs as a mapreduce job. Sqoop utilizes hadoop's parallelism for doing the parallel import and export. Internally sqoop is running as a mapper alone job that utilizes jdbc. For using sqoop, we need a good network connectivity between the RDBMS environment and hadoop environment.

2) By dumping the data from database and transferring via portable secondary storage devices
Most of the companies may not allow direct network connectivity to RDBMS environment from hadoop. Another reason for not allowing is that when a sqoop job is triggered the data flow through the network will be very high which will affect the performance of other systems connected to the network. In such cases, data will be transferred to the hadoop environment by dumping the data from the database, copying it to some portable secondary storage devices or some cloud storage (if allowed) and transferring the data to hadoop environment.

How to add new nodes to a hadoop cluster ?

Hadoop doesn't require any downtime for adding new nodes to the cluster. The following steps explains the procedure for adding new nodes to an existing hadoop cluster.
  • Get the machine ready with proper hardware and compatible software.
  • Install datanode & tasktracker/nodemanager in the new machine. 
  • Configure the data storage locations and temporary storage locations properly.
  • Add proper configuration files same as that of existing nodes in the hadoop cluster.
  • Add the new machine to the network properly.
  • Start the datanode & tasktracker/nodemanager services in the new machines.
  • The new machines will be automatically added to the hadoop cluster. You can verify this from the namenode and jobtracker/resource manager UI.
  • If the hadoop cluster is configured with allowed hosts, add the hostnames of new machines to the allowed hosts file.
  • After the nodes are added to the cluster you can run the balancer if required

Map and reduce phase internals - Passing Key and Value pairs from mapper to reducer

Suppose the mappers generated the following output

(coder,1)
(fox,1)
(in,1)
(boots,1)
(coder,1)
(hadoop,1)

How many keys will be passed to the reducers reduce() method ?

Ans : five

Reason: The input to the reduce method of reducer will be key and list of values. The output of the mapper will be grouped based on the keys and will be send to the reducer. Here in this case after grouping based on keys we will get an output like
coder,[1,1]
fox,[1]
in,[1]
boots,[1]
hadoop,[1]

Here we have five unique keys, so the five keys will be sent to the reducer.

Role of partitioner in mapreduce execution

Mapreduce job has mainly two phases, map phase and reduce phase. The output of the mapper will be passed to the reducer. The input and output of mapper will be key-value pair. The input of reducer will be key and list of values. The input to the reducer are the portion of the map output or the complete map output. The map output will be partitioned based on keys and multiple partitions will be generated. Each partition will be send to reducers. The output of map is partitioned in such a way that all the similar keys will be grouped in a single partition and will be send to the same reducer. Otherwise we will get incorrect output. The default partitioner in hadoop uses the hash value of the keys for partitioning the map outputs. This can be overridden by writing user defined partitioning function.

Saturday, 3 January 2015

When will the reduce tasks start execution in a mapreduce job.?

Reduce tasks will be started only after the completion of all map tasks. Sometimes we may see the progression of reduce tasks as 15% and the map tasks as 90%. This doesn't means that the reduce tasks started execution. The reduce logic has to be executed on the complete output of all the mappers, not on the partial output. 

The reduce() method will not be called until all of the mappers have completed. If the reduce logic is applied on the partial map output, we will get incorrect output. The 15% or some value pf progression of reduce tasks before the completion of map phase is not the reduce logic execution progress, it is the progress representing the transfer of mapper output to the reducers. The mapred.reduce.slowstart.completed.maps property specifies the percentage of mappers that must complete before the reducers can start receiving data from the completed mappers. Once all the mapper execution gets completed, the reducer starts executing on to of the map outputs.

What happens to a mapreduce job with a reduce class when we set the number of reduce tasks as zero ?

When we set the number of reduce tasks as zero, reduce tasks will not be executed. The output of the mapper will be copied to the hdfs and it will be the output of the job. Suppose 10 mappers were spawned for a job, if we set the number of reduce tasks as zero, we will get 10 output files.
The output files will be with a name similar to part-m-00000, part-m-00001 ..... part-m-00009.
We can set the number of reduce tasks as zero either from the program or from the commandline.

In the program we can set this by setting the following configuration
job.setNumReduceTasks(0);

From the  command line also we can achieve the same result by using the property below
-Dmapred.reduce.tasks=0

What happens to a mapreduce job if the user sets the number of reduce tasks as one ?

When the number of reduce tasks is set to one, only one reduce task will be executed for the entire jobs. All the intermediate map outputs will be gathered by a single reducer. The single reducer processes the entire map outputs and the output will be stored in a single file in hdfs. It will be with the name part-r-00000.
For setting the number of reduce tasks as one, add the following property in the driver class.
job.setNumReduceTasks(1);

What is the best tool for creating workflow or chaining jobs in hadoop ?

Sometimes we require some tools to chain mapreduce jobs, hive jobs, pig jobs etc. We can chain these jobs using our own way either by using programs or using some scripts. But the best way to chain jobs in hadoop ecosystem is by using oozie.

Oozie is a workflow and orchestration framework in hadoop ecosystem. We don't need to worry about the complexities of handling various scenarios that may have to be considered while developing a chaining tool. Oozie is a very simple tool and the workflows can be achieved by using an xml file. For more details refer oozie website

How to pass small number of configuration parameters to a mapper and reducer ?

Hadoop is having several configurable properties that will be present in several xml and properties files. The main configuration files in hadoop are core-site.xml, mapred-site.xml, hdfs-site.xml, yarn-site.xml. The parameters in these configuration files are set while installing the cluster. This will be done by the administrator.

If a developer while developing the mapreduce programs want to modify some of the configuration parameter, he can do it from the program itself. The way to modify these values from the program is by instantiating the configuration class and setting the configuration values by passing the parameter and value as key-value pairs to the program.

The syntax is as shown below
Configuration conf = new Configuration();
conf.set("key1","value1");
conf.set("key2","value2");

Friday, 2 January 2015

Managed and External Tables in Hive

In hive, we can create two types of tables
  • ·         Managed table
  • ·         External table
By default the hive stores the data in the hive warehouse directory. When we create a table in hive, a directory corresponding to the table will be created in the hive warehouse directory. Hive warehouse directory is a location in hdfs where the hive stores the data of all the tables that we create in hive without specifying any location. By default the location of the warehouse directory is /user/hive/warehouse. We can modify this location globally by modifying this property with a different value in the hive-site.xml.

 We can point a hive table to any other location in hdfs rather than the default storage location. The main difference between external and managed tables is that if we drop a managed table, the table as well as the data will be deleted but if we delete an external table, only the table will get deleted, data will not be deleted.
External tables will be very useful in scenarios where we need to share the input data between multiple jobs or users.

Suppose a workflow with A as input of processes B, C and D. B is a hive job, C is a mapreduce job and D is a pig job. Here if we use managed hive table, when we use managed table for B, while loading data it will move the data from A’s actual location to the warehouse directory. So when the other processes C and D tries to access the data, it will not be present in the actual location. If the user drops the table at the end of the process B will delete the input data which may not be feasible in this situation.
Sample DDL for creating a managed hive table is given below.

create table details (id int, name string) row format
delimited fields terminated by ‘,’ lines terminated by ‘\n’;

Sample DDL for creating an external table

create external table details_ext(id int, name string) row format
delimited fields terminated by ‘,’ lines terminated by ‘\n’ 
location ‘/user/hadoop/external_table’;

The location specified in the external table can be any location in hdfs. You can avoid the ‘lines terminated by’ part in the DDL because the default value is ‘\n’.

What is hive ?


Hive is one of the members in the Hadoop ecosystem.  Hadoop is written in java. So initially hadoop was limited to only the subset of engineers who know java. Later, some smart guys in facebook designed a layer on top of hadoop which can act as a mediator between the SQL experts and hadoop. They made an application that will accept SQL standard queries and talks to hadoop by parsing the queries. This application is called hive. This is internally accessing the HDFS and data processing is happening through mapreduce. The main advantage is that the user doesn’t need to worry about the complexity of writing lengthier mapreduce programs. After the invention of this application, hadoop became popular among SQL experts through hive.  Another advantage of hive is that the development time for some solutions are very faster compared to writing java programs. Sometimes a few lines of queries may work well instead of writing several hundred lines of code.

In hive the data is represented as tables. A table is a representation of data with a schema. In hadoop data is stored in hdfs. So if we look at the data through a schema, we will be able to visualize the data in tabular format. In hive the schema is stored in metastore. A metastore is a lightweight database where the hive stores the metadata of tables. By default hive uses derby database, which is not suitable for production or multi-user environments. So usually people use mysql, postgresql etc as metastore.

What is hadoop ?


Hadoop is a framework which is designed in special for handling large data. The intension behind the development of hadoop is to develop a scalable low cost framework that can process large data. The Hadoop is having a distributed file system and a distributed processing layer. This distributed file system and distributed processing layer is residing on top of several commodity machines. The team work of the commodity machines is the strength of hadoop.

The distributed storage layer of hadoop is called Hadoop Distributed File System (HDFS) and the distributed processing layer is called mapreduce.  The idea of this hdfs and mapreduce came from google frameworks such as google file system (GFS) and google mapreduce.

Hadoop is designed in such a way that it can run on commodity hardware which will reduce the cost. In other data processing frameworks, the hardware itself is handling the fault, but in hadoop, the framework itself is handling the hardware failure. Hadoop doesn't require any RAID arrangement of disks. It just requires the disks in JBOD configuration. JBOD means just a bunch of disks.

How to check the memory utilization of cluster nodes in a Kubernetes Cluster ?

 The memory and CPU utilization of a Kubernetes cluster can be checked by using the following command. kubectl top nodes The above command...