jueves, 23 de mayo de 2013

WordCount: testing Hadoop

En la anterior entrada hablé de Ambari, la plataforma para el control de clusters Hadoop y servicios asociados. Hoy vamos a poner a prueba el funcionamiento de cluster. Para ello lanzaremos nuestro primer programa paralelo. Para lo ocasión usaremos uno de ejemplo de los que podréis encontrar en la Wiki de Hadoop y cuya función es contar las palabras que aparecen en uno (o varios) archivos de texto.

Antes de ello, repasemos algunos conceptos asociados a Hadoop. Como ya vimos anteriormente, su objetivo es proveer un entorno para el trabajo paralelo bajo el paradigma MapReduce, trabajos que se distribuyen para su procesamiento en un cluster. HDFS es el sistema de ficheros en dicho cluster que soporta esta operativa. Este sistema de ficheros permite, usando el disco local de cada máquina, generar un espacio unificado que desde el punto de vista lógico se comporta y gestiona como si de un solo disco se tratase. En realidad por debajo HDFS lo que hace es replicar la información en los diferentes nodos del cluster asegurando en todo momento la coherencia de datos y la tolerancia a fallos por caídas puntuales de nodos.

Lo primero que necesitaremos en un fichero de entrada con palabras que contar. Como es un test tampoco importa mucho cómo sea ese fichero. Por ejemplo, he generado uno de nombre input1.txt concatenando varios ficheros de logs del sistema. El fichero ocupa 542MB y tiene mas de 6 millones de líneas:


Como puede verse en la imagen anterior, primero deberemos copiar el fichero al espacio de disco HDFS del cluster. Eso se consigue haciendo uso de los comandos Hadoop asociados al tratamiento del almacenamiento (dfs), en concreto tendremos que crear un nuevo directorio que contenga el fichero de entrada y copiarlo en dicha localización dentro del espacio HDFS.

Pero antes de lanzar nuestro programa tendremos que crearlo. Hadoop está desarrollado con tecnología Java por lo que ofrece todo un conjunto completo de librerías bajo este lenguaje para desarrollar los aplicativos paralelos. En nuestro caso el código que especifica cómo debe ser tratada la entrada, wordcount, debe definir las acciones tanto para el mapeo (Map) como la reducción (Reduce). Siguiendo el ejemplo de la wiki comentado anteriormente, el código quedaría de la siguiente forma:

Es bastante intuitivo interpretar lo definido anteriormente. Se puede apreciar que están definidas claramente tres secciones:
  1. La función Map: en la que a cada palabra se le asigna el valor 1 (one)
  2. La función Reduce: en la que se agrega dicho valor por cada palabra mapeada
  3. La función main: principal, que es la que controla la creación de la tarea (Job), establece los parámetros iniciales de lanzamiento y controla su estado.
Definido el fichero de entrada y el programa paralelo solo resta ejecutarlo en nuestro cluster:

Como acabamos de comprobar, nuestro job paralelo se ha ejecutado en el cluster Hadoop satisfactoriamente, obteniendo una respuesta a nuestra necesidad: contar el numero de veces que cada palabra aparecía en el fichero de entrada. Pero recordemos que disponemos de la plataforma Ambari, nuestro controlador del cluster Hadoop. Si revisamos en su interfaz web podremos ver detalles interesantes como, por ejemplo, la información asociada al job:


De forma rápida tenemos acceso a información detallada sobre el trabajo en ejecución. Y si pulsamos sobre el job en concreto obtendremos un detalle aun mayor de cómo y cuando se han ido ejecutando las diferentes fases del trabajo:


Visualmente obtenemos información tan interesante como:
  • El estado de ejecución: exitoso (SUCCESS) o erróneo (FAILED)
  • La duración total del trabajo, casi 212 segundos
  • 5 de los 6 nodos disponibles han estado implicados en el trabajo
  • Detalles sobre el estado de ejecución de la tarea en el tiempo, como cuantos nodos han participado en las diferentes fases Map y Reduce.
Si comparamos esta tarea con realizar el cálculo haciendo uso de un script tradicional (secuencial) observamos el siguiente resultado:


En este caso, la ejecución completa del trabajo ha tardado 4 minutos y 35 segundos, es decir, 275 segundos. Comparativamente se ha obtenido una reducción del 30% en tiempo de ejecución gracias al uso del cluster.

Y esto es solo un ejemplo. Con tareas mejor diseñadas las reducciones podrían ser mas significativas. Y no olvidemos que el cluster, y su filosofía de trabajo paralelizada, nos permiten abordar problemas de cálculo con volúmenes de entrada muy grandes. En este ejemplo hemos usando un fichero con unas 6 millones de lineas. Pero imaginad que tuviera, por ejemplo, 10 o 100 veces más. El problema seguiría siendo resoluble en nuestro cluster Hadoop y sin embargo, por limitaciones de memoria, una sola máquina de forma secuencial dificilmente podría abordarlo.

No hay comentarios: