jueves, 23 de mayo de 2013

WordCount: testing Hadoop

En la anterior entrada hablé de Ambari, la plataforma para el control de clusters Hadoop y servicios asociados. Hoy vamos a poner a prueba el funcionamiento de cluster. Para ello lanzaremos nuestro primer programa paralelo. Para lo ocasión usaremos uno de ejemplo de los que podréis encontrar en la Wiki de Hadoop y cuya función es contar las palabras que aparecen en uno (o varios) archivos de texto.

Antes de ello, repasemos algunos conceptos asociados a Hadoop. Como ya vimos anteriormente, su objetivo es proveer un entorno para el trabajo paralelo bajo el paradigma MapReduce, trabajos que se distribuyen para su procesamiento en un cluster. HDFS es el sistema de ficheros en dicho cluster que soporta esta operativa. Este sistema de ficheros permite, usando el disco local de cada máquina, generar un espacio unificado que desde el punto de vista lógico se comporta y gestiona como si de un solo disco se tratase. En realidad por debajo HDFS lo que hace es replicar la información en los diferentes nodos del cluster asegurando en todo momento la coherencia de datos y la tolerancia a fallos por caídas puntuales de nodos.

Lo primero que necesitaremos en un fichero de entrada con palabras que contar. Como es un test tampoco importa mucho cómo sea ese fichero. Por ejemplo, he generado uno de nombre input1.txt concatenando varios ficheros de logs del sistema. El fichero ocupa 542MB y tiene mas de 6 millones de líneas:


Como puede verse en la imagen anterior, primero deberemos copiar el fichero al espacio de disco HDFS del cluster. Eso se consigue haciendo uso de los comandos Hadoop asociados al tratamiento del almacenamiento (dfs), en concreto tendremos que crear un nuevo directorio que contenga el fichero de entrada y copiarlo en dicha localización dentro del espacio HDFS.

Pero antes de lanzar nuestro programa tendremos que crearlo. Hadoop está desarrollado con tecnología Java por lo que ofrece todo un conjunto completo de librerías bajo este lenguaje para desarrollar los aplicativos paralelos. En nuestro caso el código que especifica cómo debe ser tratada la entrada, wordcount, debe definir las acciones tanto para el mapeo (Map) como la reducción (Reduce). Siguiendo el ejemplo de la wiki comentado anteriormente, el código quedaría de la siguiente forma:

package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
view raw WordCount.java hosted with ❤ by GitHub
Es bastante intuitivo interpretar lo definido anteriormente. Se puede apreciar que están definidas claramente tres secciones:
  1. La función Map: en la que a cada palabra se le asigna el valor 1 (one)
  2. La función Reduce: en la que se agrega dicho valor por cada palabra mapeada
  3. La función main: principal, que es la que controla la creación de la tarea (Job), establece los parámetros iniciales de lanzamiento y controla su estado.
Definido el fichero de entrada y el programa paralelo solo resta ejecutarlo en nuestro cluster:

[ambari-qa@master-node ~]$ hadoop dfs -ls ./entrada
Found 2 items
drwx------ - ambari-qa hdfs 0 2013-05-23 18:41 /user/ambari-qa/entrada/entrada
-rw------- 3 ambari-qa hdfs 568233984 2013-05-23 20:31 /user/ambari-qa/entrada/input1.txt
[ambari-qa@master-node ~]$ hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount ./entrada/input1.txt ./salida
13/05/23 20:38:15 INFO input.FileInputFormat: Total input paths to process : 1
13/05/23 20:38:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
13/05/23 20:38:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev cf4e7cbf8ed0f0622504d008101c2729dc0c9ff3]
13/05/23 20:38:15 WARN snappy.LoadSnappy: Snappy native library is available
13/05/23 20:38:15 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/05/23 20:38:15 INFO snappy.LoadSnappy: Snappy native library loaded
13/05/23 20:38:15 INFO mapred.JobClient: Running job: job_201305232015_0004
13/05/23 20:38:16 INFO mapred.JobClient: map 0% reduce 0%
13/05/23 20:38:29 INFO mapred.JobClient: map 20% reduce 0%
13/05/23 20:38:39 INFO mapred.JobClient: map 20% reduce 6%
13/05/23 20:39:46 INFO mapred.JobClient: map 21% reduce 6%
13/05/23 20:39:50 INFO mapred.JobClient: map 22% reduce 6%
13/05/23 20:39:54 INFO mapred.JobClient: map 23% reduce 6%
13/05/23 20:39:56 INFO mapred.JobClient: map 24% reduce 6%
13/05/23 20:39:57 INFO mapred.JobClient: map 26% reduce 6%
13/05/23 20:40:00 INFO mapred.JobClient: map 27% reduce 6%
13/05/23 20:40:03 INFO mapred.JobClient: map 31% reduce 6%
13/05/23 20:40:13 INFO mapred.JobClient: map 33% reduce 6%
13/05/23 20:40:15 INFO mapred.JobClient: map 34% reduce 6%
13/05/23 20:40:16 INFO mapred.JobClient: map 35% reduce 6%
13/05/23 20:40:18 INFO mapred.JobClient: map 40% reduce 6%
13/05/23 20:40:21 INFO mapred.JobClient: map 42% reduce 6%
13/05/23 20:40:28 INFO mapred.JobClient: map 44% reduce 6%
13/05/23 20:40:30 INFO mapred.JobClient: map 45% reduce 6%
13/05/23 20:40:31 INFO mapred.JobClient: map 46% reduce 6%
13/05/23 20:40:33 INFO mapred.JobClient: map 50% reduce 6%
13/05/23 20:40:36 INFO mapred.JobClient: map 52% reduce 6%
13/05/23 20:40:40 INFO mapred.JobClient: map 54% reduce 6%
13/05/23 20:40:43 INFO mapred.JobClient: map 55% reduce 6%
13/05/23 20:40:45 INFO mapred.JobClient: map 57% reduce 6%
13/05/23 20:40:46 INFO mapred.JobClient: map 58% reduce 6%
13/05/23 20:40:48 INFO mapred.JobClient: map 61% reduce 6%
13/05/23 20:40:49 INFO mapred.JobClient: map 62% reduce 6%
13/05/23 20:40:52 INFO mapred.JobClient: map 64% reduce 6%
13/05/23 20:40:55 INFO mapred.JobClient: map 65% reduce 6%
13/05/23 20:40:57 INFO mapred.JobClient: map 66% reduce 6%
13/05/23 20:40:58 INFO mapred.JobClient: map 68% reduce 6%
13/05/23 20:41:00 INFO mapred.JobClient: map 71% reduce 6%
13/05/23 20:41:01 INFO mapred.JobClient: map 72% reduce 6%
13/05/23 20:41:03 INFO mapred.JobClient: map 73% reduce 6%
13/05/23 20:41:04 INFO mapred.JobClient: map 74% reduce 6%
13/05/23 20:41:07 INFO mapred.JobClient: map 75% reduce 6%
13/05/23 20:41:11 INFO mapred.JobClient: map 76% reduce 6%
13/05/23 20:41:13 INFO mapred.JobClient: map 79% reduce 6%
13/05/23 20:41:14 INFO mapred.JobClient: map 80% reduce 6%
13/05/23 20:41:16 INFO mapred.JobClient: map 84% reduce 6%
13/05/23 20:41:17 INFO mapred.JobClient: map 85% reduce 6%
13/05/23 20:41:19 INFO mapred.JobClient: map 86% reduce 6%
13/05/23 20:41:25 INFO mapred.JobClient: map 88% reduce 6%
13/05/23 20:41:26 INFO mapred.JobClient: map 89% reduce 6%
13/05/23 20:41:28 INFO mapred.JobClient: map 91% reduce 6%
13/05/23 20:41:29 INFO mapred.JobClient: map 94% reduce 6%
13/05/23 20:41:32 INFO mapred.JobClient: map 95% reduce 6%
13/05/23 20:41:37 INFO mapred.JobClient: map 97% reduce 6%
13/05/23 20:41:38 INFO mapred.JobClient: map 98% reduce 6%
13/05/23 20:41:41 INFO mapred.JobClient: map 100% reduce 6%
13/05/23 20:41:43 INFO mapred.JobClient: map 100% reduce 13%
13/05/23 20:41:47 INFO mapred.JobClient: map 100% reduce 100%
13/05/23 20:41:48 INFO mapred.JobClient: Job complete: job_201305232015_0004
13/05/23 20:41:48 INFO mapred.JobClient: Counters: 30
13/05/23 20:41:48 INFO mapred.JobClient: Job Counters
13/05/23 20:41:48 INFO mapred.JobClient: Launched reduce tasks=1
13/05/23 20:41:48 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=808592
13/05/23 20:41:48 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/05/23 20:41:48 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/05/23 20:41:48 INFO mapred.JobClient: Rack-local map tasks=4
13/05/23 20:41:48 INFO mapred.JobClient: Launched map tasks=5
13/05/23 20:41:48 INFO mapred.JobClient: Data-local map tasks=1
13/05/23 20:41:48 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=195476
13/05/23 20:41:48 INFO mapred.JobClient: File Output Format Counters
13/05/23 20:41:48 INFO mapred.JobClient: Bytes Written=67079
13/05/23 20:41:48 INFO mapred.JobClient: FileSystemCounters
13/05/23 20:41:48 INFO mapred.JobClient: FILE_BYTES_READ=2856148
13/05/23 20:41:48 INFO mapred.JobClient: HDFS_BYTES_READ=568758942
13/05/23 20:41:48 INFO mapred.JobClient: FILE_BYTES_WRITTEN=3649388
13/05/23 20:41:48 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=67079
13/05/23 20:41:48 INFO mapred.JobClient: File Input Format Counters
13/05/23 20:41:48 INFO mapred.JobClient: Bytes Read=568758272
13/05/23 20:41:48 INFO mapred.JobClient: Map-Reduce Framework
13/05/23 20:41:48 INFO mapred.JobClient: Map output materialized bytes=428427
13/05/23 20:41:48 INFO mapred.JobClient: Map input records=6433794
13/05/23 20:41:48 INFO mapred.JobClient: Reduce shuffle bytes=428427
13/05/23 20:41:48 INFO mapred.JobClient: Spilled Records=173561
13/05/23 20:41:48 INFO mapred.JobClient: Map output bytes=871502402
13/05/23 20:41:48 INFO mapred.JobClient: CPU time spent (ms)=172100
13/05/23 20:41:48 INFO mapred.JobClient: Total committed heap usage (bytes)=2202488832
13/05/23 20:41:48 INFO mapred.JobClient: Combine input records=76638558
13/05/23 20:41:48 INFO mapred.JobClient: SPLIT_RAW_BYTES=670
13/05/23 20:41:48 INFO mapred.JobClient: Reduce input records=22639
13/05/23 20:41:48 INFO mapred.JobClient: Reduce input groups=3774
13/05/23 20:41:48 INFO mapred.JobClient: Combine output records=143375
13/05/23 20:41:48 INFO mapred.JobClient: Physical memory (bytes) snapshot=1995444224
13/05/23 20:41:48 INFO mapred.JobClient: Reduce output records=3774
13/05/23 20:41:48 INFO mapred.JobClient: Virtual memory (bytes) snapshot=7960113152
13/05/23 20:41:48 INFO mapred.JobClient: Map output records=76517822
[ambari-qa@master-node ~]$ hadoop dfs -ls ./salida
Found 1 items
-rw------- 3 ambari-qa hdfs 67079 2013-05-23 20:41 /user/ambari-qa/salida/part-r-00000
[ambari-qa@master-node ~]$ hadoop dfs -cat ./salida/part-r-00000 > salida.txt
[ambari-qa@master-node ~]$ head -10 salida.txt
"pci=nocrs" 5910
#0 10409
#1 29277
#2 10409
#22 4499
#3 10409
#4 10409
#5 10409
#6 5910
#7 5910
[ambari-qa@master-node ~]$
view raw wordcount_exec hosted with ❤ by GitHub
Como acabamos de comprobar, nuestro job paralelo se ha ejecutado en el cluster Hadoop satisfactoriamente, obteniendo una respuesta a nuestra necesidad: contar el numero de veces que cada palabra aparecía en el fichero de entrada. Pero recordemos que disponemos de la plataforma Ambari, nuestro controlador del cluster Hadoop. Si revisamos en su interfaz web podremos ver detalles interesantes como, por ejemplo, la información asociada al job:


De forma rápida tenemos acceso a información detallada sobre el trabajo en ejecución. Y si pulsamos sobre el job en concreto obtendremos un detalle aun mayor de cómo y cuando se han ido ejecutando las diferentes fases del trabajo:


Visualmente obtenemos información tan interesante como:
  • El estado de ejecución: exitoso (SUCCESS) o erróneo (FAILED)
  • La duración total del trabajo, casi 212 segundos
  • 5 de los 6 nodos disponibles han estado implicados en el trabajo
  • Detalles sobre el estado de ejecución de la tarea en el tiempo, como cuantos nodos han participado en las diferentes fases Map y Reduce.
Si comparamos esta tarea con realizar el cálculo haciendo uso de un script tradicional (secuencial) observamos el siguiente resultado:


En este caso, la ejecución completa del trabajo ha tardado 4 minutos y 35 segundos, es decir, 275 segundos. Comparativamente se ha obtenido una reducción del 30% en tiempo de ejecución gracias al uso del cluster.

Y esto es solo un ejemplo. Con tareas mejor diseñadas las reducciones podrían ser mas significativas. Y no olvidemos que el cluster, y su filosofía de trabajo paralelizada, nos permiten abordar problemas de cálculo con volúmenes de entrada muy grandes. En este ejemplo hemos usando un fichero con unas 6 millones de lineas. Pero imaginad que tuviera, por ejemplo, 10 o 100 veces más. El problema seguiría siendo resoluble en nuestro cluster Hadoop y sin embargo, por limitaciones de memoria, una sola máquina de forma secuencial dificilmente podría abordarlo.

3 comentarios:

sergio199722 dijo...

Hola muchas gracias por la información, necesito por favor de tu ayuda, soy nuevo en hadoop, la verdad solo pude instalarlo pero no se como se maneja, me pidieron un trabajo donde cargue algún trabajo de texto y me diga cuantas palabras hay en el texto, la verdad es que no entiendo nada de la información que hay en Internet, ya que no se como se usa hadoop, ya lo tengo instalado pero no se como realizar un trabajo de esos

JuanK dijo...

Hola soy nuevo en el Hadoop y MapReduce y necesito aprobar un curso pero no entiendo porque no me funcionan los comandos, necesito aprobar un test de dos preguntas:
1.- Descargue el texto de Alicia en el país de las maravillas de http://www.gutenberg.org/files/11/11-0.txt (si lo redirige a una página con una ventana emergente de bienvenida, haga clic en "Texto sin formato UTF-8 "en esa página o simplemente descargue el archivo adjunto a continuación) y ejecute el recuento de palabras en él. Esto se puede hacer usando comandos de hadoop. ¿Cuántas veces aparece la palabra Cheshire? (No incluya la palabra 'Cheshire con un apóstrofe. La cadena ->' Cheshire <- no cuenta)
2.- El conjunto de aplicaciones MapReduce de ejemplo incluye wordmedian, que calcula la longitud media de las palabras en un archivo de texto. Si ejecuta wordmedian usando words.txt (el texto de Shakespeare) como entrada, ¿cuál es la longitud de palabra mediana?

Tenga en cuenta que wordmedian imprime la longitud mediana en la terminal al final del trabajo MapReduce; el archivo de salida no contiene la longitud media.
He intentado por varias veces ejecutando los comandos pero no me da la respuesta .ayuda si alguien sabe por favor..
Saludos Cordiales,

Juan Carlos

Valdivia dijo...

Juan Carlos, encontraste la respuesta? Estoy en la misma situación, si me puedes ayudar, te agradezco mucho, gracias.