Autres apports de la version 2 de Hadoop

La version 2 de Hadoop supporte officiellement Windows Server et Windows Azure.

La version 2 de Hadoop permet aussi :

  • D’accéder à HDFS à partir d’un système de fichiers distribué NFS-v3, ce qui permet alors de considérer HDFS comme un disque partagé standard.
  • De prendre des “clichés” (snapshots) de tout ou partie d’un cluster.

Hadoop 2 – High Availability

Dans la version 1 de Hadoop, le NameNode (NN) est un SPOF (Single Point Of Failure) et la seule façon de parer à une défaillance du NN était :

  • De lui affecter une machine à haute tolérance aux pannes.
  • D’avoir une machine “miroir”, prête à démarrer en cas de défaillance du NN.

Depuis Hadoop 2.0.0, cette notion de machine “miroir” a été remplacée par le mode HA (High Availability = haute disponibilité). Dans ce mode, il existe deux NN, l’un actif (Active NN), l’autre en veille (Standby NN). Le Standby NN est prêt à remplacer automatiquement l’Active NN en cas de défaillance de ce dernier, en quelques secondes en mode automatique.

Le basculement de l’Active NN vers le Standby NN peut se faire de manière automatique ou manuelle.

Les daemons de HDFS en mode High Availability (HA)

Les daemons de HDFS en mode High Availability (HA)

Un daemon nommé ZooKeeper Failover Controller (ZKFC) s’exécute sur chaque NameNode (Active NN et Standby NN). Si ZooKeeper ne reçoit pas d’information de l’Active NN pendant un temps prédéfini, il considérera ce dernier comme défaillant, et basculera sur le Standby NN.

Les principaux apports de la version 2 d’Hadoop – Partie 1

L’objectif de ce article est de présenter les principaux apports de la version 2 de Hadoop. Ils sont au nombre de quatre :

  • High Availability (HA) ou haute disponibilité en français , un nouveau dispositif de HDFS qui permet de faire en sorte que le NameNode (NN) ne soit plus un Single Point Of Failure (SPOF).
  • Federation, un nouveau dispositif de HDFS qui permet de gérer de manière plus efficace les clusters de grande taille.
  • YARN, un gestionnaire de ressources générique de seconde génération sur lequel s’appuie MapReduce, et qui peut être aussi mis en œuvre par d’autres modèles de traitement de données.
  • La possibilité d’installer Hadoop directement sous Microsoft Windows.

Pour une liste à jour des dernières versions de Hadoop et de leurs fonctionnalités, se reporter l’adresses suivante : http://hadoop.apache.org/releases.html#News.

High Availability

Dans la version 1 de Hadoop, les clusters Hadoop disposaient d’un NN unique qui gérait l’arborescence des fichiers HDFS et suivait le stockage des  données dans un cluster.

Jusqu’à Hadoop 2.0.0, la seule façon de parer à une défaillance du NN était :

  • De lui affecter une machine à haute tolérance aux pannes.
  • D’avoir une machine “miroir”, prête à démarrer en cas de défaillance du NN.

Depuis Hadoop 2.0.0, cette notion de machine “miroir” a été institutionnalisée par le biais du mode HA (High Availability = haute disponibilité). Dans ce mode, il existe deux NN, l’un actif (Active NN), l’autre en veille (Standby NN). Le Standby NN est prêt à remplacer automatiquement l’Active NN en cas de défaillance de ce dernier, en quelques secondes en mode automatique.

Les informations sur la cartographie des données du cluster sont disponibles à la fois au niveau de l’Active NN et au niveau du Standby NN.

Le Standby NN assume en outre la fonction du SNN, qui n’existe donc plus tant qu’il est en veille.

L’opération consistant à basculer de l’Active NN vers le Standby NN peut se faire de manière automatique (Automatic Failover) ou manuelle (Manual Failover).

Les daemons de HDFS en mode "High Availability (HA)"

Les daemons de HDFS en mode “High Availability (HA)”

Développement de programmes MapReduce

les principes de fonctionnement de MapReduce restent valables quel que soit le langage de programmation utilisé.

Un programme Hadoop se divise généralement en trois parties :

  • Le driver, qui s’exécute sur une machine client, est chargé de configurer le job puis de le soumettre pour exécution.
  • Le mapper est chargé de lire les données stockées sur disque et les traiter.
  • Le reducer est chargé de consolider les résultats issus du mapper puis de les écrire sur disque.

1 – Les entrées-sorties

Dans MapReduce, les données sont toujours lues ou écrites selon le format <key, value> (<clef, valeur>).

Les données lues par le mapper sont définies au niveau du driver. La définition des données comprend :

  • Leur localisation (fichier ou répertoire).
  • Le type des enregistrements, qui est défini par la classe InputFormat.
  • La détermination de la taille des InputSplits : un InputSplit définit le volume des données qui seront lues à chaque opération de lecture.

Hadoop prend en compte par défaut les types d’enregistrement suivants :

  • TextInputFormat :
    • value est une ligne entière terminée par \n
    • key est l’offset de la ligne depuis le début de fichier
  • KeyValueTextInputFormat:
    • Chaque ligne est supposée être au format <key><separator><value>\n.
    • Le separateurr par défaut est tab.
  • SequenceFileInputFormat :
    • Permet de lire un fichier binaire de paires <key, value>, comprenant éventuellement des métadonnées.
  • SequenceFileAsTextInputFormat :
    • • Format identique au précédent mais, en plus, convertit les clefs et les valeurs en strings (<key.toString(), value.toString()>).

Dans Hadoop:

  • Les clefs sont des objets qui implémentent l’interface ComparableWritable.
  • Les valeurs sont des objets qui implémentent l’interface Writable.

Par défaut, Hadoop propose les types de données suivants :

  • IntWritable (int en Java).
  • DoubleWritable (double en Java).
  • Text (string en Java).
  • etc.

2 – La phase map (exemple 1)

Nous allons utiliser le programme WordCount pour expliquer le fonctionnement d’un mapper ainsi que celui d’un reducer. Le programme WordCount est écrit en pseudo-code et est simplifié à l’extrême.

Il s’agit de concevoir un programme Hadoop comptant le nombre d’occurrences des différents mots composant un livre.

Voici le contenu du fichier (livre) en entrée :

Lorem ipsum dolor sit
nec et nec et ultrices
Pellentesque et Pellentesque et bibendum
Lorem ipsum dolor sit

Un enregistrement correspond à une ligne terminée par \n. Dans un véritable programme Hadoop, on utilisera le format TextInputFormat pour lire les enregistrements.

Ci-dessous le mapper du programme WordCount :

Explication :

Ligne 1 : le mapper lit en entrée un enregistrement sous la forme d’un couple <key, value> avec :

  • value du type String (c’est une ligne du fichier).
  • key du type LongWritable (c’est l’offset de la ligne depuis le début du fichier).

ligne 2 : pour chaque mot w de la ligne en cours…

Ligne 3 : on écrit dans un fichier en sortie le couple <w, 1>, 1 correspondant à une occurrence du mot contenu dans la variable w.

Le fichier en sortie de mapper sera donc le suivant :

Lorem,1
ipsum,1
dolor,1
sit,1
nec,1
et,1
nec,1
et,1
ultrices,1
Pellentesque,1
et,1
Pellentesque,1
et,1
bibendum,1
Lorem,1
ipsum,1
dolor,1
sit,1

3 – Entre la phase map et la phase reduce (exemple 1)

Avant d’être envoyé au reducer, le fichier est automatiquement trié par clef par Hadoop : c’est ce que l’on appelle la phase de “shuffle & sort”. Le fichier en entrée du reducer est le suivant :

Lorem,[1,1]
ipsum,[1,1]
dolor,[1,1]
sit,[1,1]
nec,[1,1]
et,[1,1,1,1]
ultrices,[1]
Pellentesque,[1,1]
bibendum,[1]

4. La phase reduce (exemple 1)

Ci-dessous le reducer du programme Hadoop permettant de consolider les résultats issus du mapper :

Explication :

Ligne 1 : le reducer prend en entrée un enregistrement sous la forme d’un couple <key, value> avec :

  • key du type Text (c’est un mot).
  • value étant une liste de valeurs du type intWritable.

Ligne 2 : le reducer remet à zéro le compteur wordCount lorsque l’on change de mot.

Ligne 3 et 4 : pour chaque valeur v dans la liste inValue2 on ajoute v à wordCount.

ligne 5 : quand on change de mot, on écrit dans un fichier en sortie le couple <inKey2, wordCount>, wordCount étant le nombre d’occurrences du mot contenu dans la variable inKey2.

Le fichier en sortie de reducer sera donc le suivant :

Lorem,2
ipsum,2
dolor,2
sit,2
nec,2
et,4
ultrices,1
Pellentesque,2
bibendum,1

Nous venons d’écrire notre premier programme Hadoop, permettant de compter le nombre d’occurrences des différents mots composant un livre !

  • Un job Hadoop comprend au minimum un mapper : il peut ne pas comprendre de phase reduce.
  • Durant la phase de shuffle & sort, tous les mappers sont susceptibles d’envoyer des données vers tous les reducers.

fonctionnement globale de mapreduce

Présentation et fonctionnement de MapReduce

Une bonne compréhension de MapReduce implique la maitrise du fonctionnement de Hadoop et HDFS. Pour cela, ces deux articles sont obligatoires pour mieux appréhender la suite du cours.

Qu’est ce que MapReduce ?

MapReduce est un modèle de programmation conçu spécifiquement pour lire, traiter et écrire des volumes de données très importants. Les programmes adoptant ce modèle sont automatiquement parallélisés et exécutés sur des clusters (grappes) d’ordinateurs. MapReduce consiste en deux fonctions map() et reduce().

map” est le nom d’une fonction de haut niveau qui applique une fonction donnée à chacun des éléments d’une liste et retourne une liste. Par exemple :

(mapcar #’round ‘(1.3 2.7 3.4 4.5)) => (1 3 3 4)

reduce” est le nom d’une fonction de haut niveau qui applique une fonction donnée à tous les éléments d’une liste et retourne une liste unique. Par exemple :

(reduce #’+ ‘(1 2 3 4)) => 10

MapReduce implémente les fonctionnalités suivantes :

  • Parallélisation automatique des programmes Hadoop.
    • HDFS se charge de la répartition et de la réplication des données ;
    • Le maître divise le travail en jobs parallèles et les répartit ;
    • Le maître collecte les résultats et gère les pannes des noeuds.
  • Gestion transparente du mode distribué.
  • Tolérance aux pannes.

Tout l’intérêt de ce modèle de programmation est de simplifier la vie du développeur Hadoop, en lui masquant le fonctionnement interne de Hadoop (parallélisation , tolérance aux pannes,…etc). Ainsi, le modèle de programmation permet au développeur de ne s’intéresser qu’à la partie algorithmique. Il transmet alors son programme MapReduce développé dans un langage de programmation au framework Hadoop pour l’exécution.

Fonctionnement de MapReduce

Un programme MapReduce se compose des trois parties suivantes

  • Le driver, qui s’exécute sur une machine client, est responsable d’initialiser le job puis de le soumettre pour exécution.
  • Le mapper est responsable de lire les données stockées sur disque et les traiter.
  • Le reducer est responsable de consolider les résultats issus du mapper puis de les écrire sur disque.

Création d’un fichier HDFS par un programme Hadoop

Création d’un fichier HDFS par un programme Hadoop comporte les grandes étapes suivantes :

  1. Le programme Hadoop demande la création du fichier à HDFS, par le biais d’une instruction create().
  2. HDFS envoie une demande de création de fichier au NameNode, par le biais d’un appel de type RPC.
  3. Le NameNode s’assure que le fichier n’existe pas et que le programme Hadoop disposes des droits nécessaires pour créer le fichier. Si tout se passe bien, le NameNode ajoute le nouveau fichier dans sa cartographie  des données du cluster. Sinon, une exception IOException est générée
  4. HDFS renvoie une instance de FSDataOutputStream au programme, ce qui lui permet d’accéder au fichier en écriture.
  5. Les lignes envoyées à HDFS pour écriture dans le fichier sont découpées en paquets et stochées temporairemenr dans une file d’attente, qui est prise en charge par une instance de DataStreamer :
    • DataStreamer demande au NameNode les adresses de trois blocs sur trois DataNodes différents pour y stocker les données de la file d’attente.
    • Chaque fois qu’un paquet de données a été stocké avec succès sur un DataNode, un accusé de réception du paquet est envoyé à FSDataOutputStream.
    • DataStreamer pourrait demander au NameNode trois nouvelles adresses lorsqu’un groupe de trois blocs est plein.
  6. Enfin, lorsque le dernier paquet a été écrit avec succès sur trois DataNodes différents, le programme Hadoop envoie close() à FSDataOutputStream.
Mais qu’arrive-t-il si un problème d’écriture intervient au niveau d’un DataNode ? HDFS le détecter, affecte un nouveau bloc de remplacement dans le cluster et réécrit dans ce nouveau bloc tous les paquets susceptibles d’être affectés par le problème d’écriture

Lecture d’un fichier HDFS par un programme Hadoop

Dans le cours précédent “Fonctionnement de HDFS” nous avons présenté HDFS (Hadoop Distributed Filesystem) et nous avons aussi detaillé son fonctionnement. Pour rappel, HDFS est le composant de Hadoop en charge du stockage des données dans un cluster Hadoop.

Dans cette partie nous allons  présenter un peu plus en détail les grandes étapes de Lecture de fichiers HDFS par un programme Hadoop :

  1. Le programme Hadoop demande l’ouverture du fichier à HDFS, par le biais d’une instruction open().
  2. HDFS demande au NameNode de localiser les premiers blocs constituant le fichier, par le biais d’un appel de type RPC
  3. Le NameNode s’assure que le programme Hadoop dispose des droits suffisants pour lire le fichier. Si oui, le NameNode renvoie à HDFS trois adresses pour chaque bloc. Sinon, une exception IOException est générée.
  4. HDFS renvoie une instance de FSDataInputStream au programme Hadoop, ce qui lui permet d’accéder au fichier en lecture, par le biais de l’instruction read().
  5. Une fois l’integralité d’un bloc a été lue, HDFS ferme la connexion avec le DataNode concerné, et en ouvre une nouvelle pour lire le bloc suivant.
  6. Enfin, lorsque la lecture du fichier est terminée, le programme Hadoop envoie close() à FSDataInputStream.
Mais qu’arrive-t-il si un problème de lecture intervient au niveau d’un bloc ? HDFS le détecte et essaye de lire le bloc concerné sur l’un des deux autres DataNode qui hébergent une copie.

 

Fonctionnement de HDFS

Présentation de HDFS

HDFS (Hadoop Distributed File System) est le composant de Hadoop en charge du stockage des données dans un cluster Hadoop

HDFS reprend de nombreux concepts proposés par des systèmes de fichiers classiques comme ext2 pour Linux ou FAT pour Windows. Nous retrouvons donc la notion de blocs (la plus petite unité que l’unité de stockage peut gérer), les métadonnées qui permettent de retrouver les blocs à partir d’un nom de fichier, les droits ou encore l’arborescence des répertoires.

Toutefois, HDFS se démarque d’un système de fichiers classique pour les principales raisons suivantes :

  • HDFS est optimisé pour maximiser les débits de données. La taille d’un bloc de données est ainsi de 64 Mo dans HDFS contre 512 octets à 4 Ko dans la plupart des systèmes de fichiers traditionnels, ce qui permet de réduire le seek time. ( Il est toutefois possible d’augmenter la taille d’un bloc à 128 Mo ou à 256 Mo en fonction des besoins)
  • HDFS est un système de gestion de fichiers du type Write Once Read Many (WORM) : on y écrit une fois le fichier, puis on y accède plusieurs fois..
  • HDFS fournit un système de réplication des blocs dont le nombre de réplications est configurable (3 par défaut). Pendant la phase d’écriture, chaque bloc correspondant au fichier est répliqué sur des nœuds distincts dans le cluster, ce qui contribue à en garantir la fiabilité et la disponibilité au moment de lecture de données. Si un bloc est indisponible sur un nœud, des copies de ce bloc seront disponibles sur d’autres nœuds.
  • HDFS s’appuie sur le système de fichier natif de l’OS pour présenter un système de stockage unifié reposant sur un ensemble de disques et de systèmes de fichiers hétérogènes.

Fonctionnement de HDFS

Le fonctionnement de HDFS est assuré par trois types de deamons :

1. Le NameNode (noeud maitre – master node):

Dans un cluster Hadoop, le NameNode gère l’espace de noms, l’arborescence du système de fichiers et les métadonnées des fichiers et des répertoires. Il centralise la localisation des blocs de données répartis dans le cluster.

Quand un client sollicite Hadoop pour récupérer un fichier, c’est via le Namenode que l’information est extraite. Ce Namenode va indiquer au client quels sont les Datanodes qui contiennent les blocs.

Le NameNode reçoit régulièrement un « battement de cœur » et un Blockreport de tous les DataNodes dans le cluster afin de s’assurer que les datanodes fonctionnent correctement. Un Bloclreport (ou rapport de bloc) contient une liste de tous les blocs d’un DataNode.

En cas de défailance du datanode, le NameNode choisit de nouveaux datanodes pour de nouvelles réplications de blocs de données, équilibre la charge d’utilisation des disques et gère également le trafic de communication des datanodes.

Les métadonnées sont stockées sur disque dur (fichier fsimage) et chargées dans la mémoire vive du NameNode lors du démarrage du cluster
Si vous perdez le NameNode, vous perdez HDFS!

2. Le SecondaryNameNode :

Il s’agit ici d’un abus de langage! Dans l’architecture HDFS, le nom – Secondary NameNode donne l’impression que c’est un substitut du NameNode. Hélas! Ce n’est pas le cas..

Le SecondaryNameNode effectue des tâches de maintenance pour le compte du NameNode. Plus précisément, le Secondary NameNode met à jour le fichier fsimage à interval régulier en y intégrant le contenu des edits.

Le SecondaryNameNode intervient soit :

  • lorsque le fichier edits atteint une taille prédéfinie.
  • à intervalle régulier (par exemple une fois par heure).

3. Le DataNode :

 

Fonctionnement de Hadoop

Pour bien comprendre le fonctionnement de Hadoop, nous vous invitons à lire ces deux cours introductifs sur “Big Data” et “Hadoop”:

L’écosystème Hadoop

Hadoop est principalement constitué de deux composants :

  • Le système de gestion de fichiers distribué, HDFS.
  • Le framework MapReduce (version 1 de Hadoop)/YARN (version 2 de Hadoop).

Plus concrètement l’écosystème Hadoop comprend de nombreux autres outils couvrant le stockage et la répartition des données, les traitements distribués, l’entrepôt de données, le workflow, la programmation, sans oublier la coordination de l’ensemble des composants. On parle des outils comme Hive, Pig, Hbase, Flume,…etc

L’écosystème Hadoop

L’écosystème Hadoop

Stockage et traitement de données

Hadoop a été conçu pour stocker de très gros volumes de données sur un grand nombre de machines (nœuds) équipées de disques durs banalisés, fonctionnant en parallèle.

L’addition de plusieurs nœuds au cluster Hadoop permet d’offrir un espace de stockage et une puissance de calcul pouvant traiter des volumes de données de plusieurs To ou Po.

Le système de gestion de fichiers de Hadoop, HDFS, écrit et lit les fichiers par blocs de 64 Mo par défau. Il est toutefois possible de monter à 128 Mo. Alors que sur des systèmes classiques, la taille est généralement de 4 Ko, l’intérêt de fournir des tailles plus grandes permet de réduire le temps d’accès à un bloc.

Les blocs dans HDFS

Les blocs dans HDFS

La redondance des données

Par défaut, les données chargée dans un cluster Hadoop sont stockées en trois exemplaires, sur des nœuds différents. Cette réplication des données répond en fait à deux objectifs :

  • Disponibilité des données en cas de panne, deux copies des données, stockées sur d’autres nœuds.
  • Lors de l’exécution d’un job Hadoop, chaque tâche peut être exécutée sur n’importe quel nœud, surtout s’il stocke une copie des données nécessaires à la tâche.

Gestion des pannes d’un cluster Hadoop

L’un des avantages de Hadoop réside dans le fait que si un des nœuds qui exécute une partie des traitements tombe en panne, le travail est repris automatiquement par un autre nœud, on parle d’une réaffectation des tâches par le deamon JobTracker.

De plus, Hadoop est capable de détecter l’incident, déterminer la tâche concernée (code et données) et de relancer la tâche sur un autre nœud disposant des données nécessaires à la bonne exécution de la tâche (où l’intérêt de répliquer les données ).

Installation de la distribution Cloudera de Hadoop

Deux possibilité sont disponibles pour télécharger Hadoop. La première solution consiste à utiliser la version proposée par la fondation Apache. La seconde solution consiste à utiliser les distributions fournies par des entreprises qui font du service autour d’Hadoop comme Cloudera , Hortonworks , MapR Technologies, …

Installation de la distribution Cloudera de Hadoop

1. Installation de VMware ou VirtualBox

Nous utiliserons VMware , et on considere son installation comme acquise et ne sera pas détaillée dans ce cours. (mais si vous avez un problème, n’hesitez pas à nous écrire)

2. Installation de Hadoop (distribution CDH4)

La version disponible au moment d’criture de ce cours est CDH5.

Connectez-vous sur le site de Cloudera à l’adresse : www.cloudera.com/content/support/en/downloads.html

En bas de page, recherchez Cloudera QuickStart VMs et cliquez sur le bouton Download Now!

hadoop-cloudera

Dans la page qui s’ouvre, cliquez sur le bouton Download for WMWare.

hadoop-clouderaVMs

Remplissez le formulaire, lisez et acceptez les Terms & Conditions pour commencer le téléchargement.

Décompressez le fichier téléchargé et sauvegardez le dossier cloudera-quickstart-vm-version-virtualbox dans un répertoire de votre choix.

Executer le fichier cloudera-quickstart-demo-vm.vmx. Ce dernier s’ouvre directement dans VMWare. Une machine virtuelle, intitulée cloudera-quickstart-demo-vm, apparaît à gauche de la fenêtre dans la liste des machines disponibles. (image ci-dessous).

Ouverture de Cloudera VM sur VMWare

Ouverture de Cloudera VM sur VMWare

Configurez la taille de mémoire vive à 1 024 Mo. (cloudera-quickstart-demo-vm >> Edit virtual machine setting >> Memory >> 1024 >> OK)

Configuration de la taille de la mémoire vive de Cloudera

Configuration de la taille de la mémoire vive de Cloudera

Sélectionnez la machine virtuelle cloudera-quickstart-demo-vm et cliquez sur la flèche verte intitulée Démarrer pour la lancer.

Démarrer Cloudera sur VMWare

Démarrer Cloudera sur VMWare

Une à deux minutes, le bureau de CentOS s’affiche, une fenêtre Firefox affichant un message comme ci-dessous:

Cloudera

Cloudera

Fermez Firefox et lancez le terminal

Terminal CentOS

Terminal CentOS

Tappez hadoop dans la fenêtre du Terminal pour verifier son installation :

Verification de Installation de la distribution Cloudera de Hadoop

Verification de Installation de la distribution Cloudera de Hadoop

l’installation s’est correctement déroulée!!

Pour arreter la machine virtuelle, cliquez sur Power Off

Arreter VMWare

Arreter VMWare

Et pour garder l’état de la machine virtuelle (retrouver la machine virtuelle exactement dans l’état dans lequel vous l’avez quittée), cliquez sur Suspend.

Suspendre VMWare

Suspendre VMWare