Antes de ello, repasemos algunos conceptos asociados a Hadoop. Como ya vimos anteriormente, su objetivo es proveer un entorno para el trabajo paralelo bajo el paradigma MapReduce, trabajos que se distribuyen para su procesamiento en un cluster. HDFS es el sistema de ficheros en dicho cluster que soporta esta operativa. Este sistema de ficheros permite, usando el disco local de cada máquina, generar un espacio unificado que desde el punto de vista lógico se comporta y gestiona como si de un solo disco se tratase. En realidad por debajo HDFS lo que hace es replicar la información en los diferentes nodos del cluster asegurando en todo momento la coherencia de datos y la tolerancia a fallos por caídas puntuales de nodos.
Lo primero que necesitaremos en un fichero de entrada con palabras que contar. Como es un test tampoco importa mucho cómo sea ese fichero. Por ejemplo, he generado uno de nombre input1.txt concatenando varios ficheros de logs del sistema. El fichero ocupa 542MB y tiene mas de 6 millones de líneas:
Como puede verse en la imagen anterior, primero deberemos copiar el fichero al espacio de disco HDFS del cluster. Eso se consigue haciendo uso de los comandos Hadoop asociados al tratamiento del almacenamiento (dfs), en concreto tendremos que crear un nuevo directorio que contenga el fichero de entrada y copiarlo en dicha localización dentro del espacio HDFS.
Pero antes de lanzar nuestro programa tendremos que crearlo. Hadoop está desarrollado con tecnología Java por lo que ofrece todo un conjunto completo de librerías bajo este lenguaje para desarrollar los aplicativos paralelos. En nuestro caso el código que especifica cómo debe ser tratada la entrada, wordcount, debe definir las acciones tanto para el mapeo (Map) como la reducción (Reduce). Siguiendo el ejemplo de la wiki comentado anteriormente, el código quedaría de la siguiente forma:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.myorg; | |
import java.io.IOException; | |
import java.util.*; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
public class WordCount { | |
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { | |
private final static IntWritable one = new IntWritable(1); | |
private Text word = new Text(); | |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | |
String line = value.toString(); | |
StringTokenizer tokenizer = new StringTokenizer(line); | |
while (tokenizer.hasMoreTokens()) { | |
word.set(tokenizer.nextToken()); | |
context.write(word, one); | |
} | |
} | |
} | |
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { | |
public void reduce(Text key, Iterable<IntWritable> values, Context context) | |
throws IOException, InterruptedException { | |
int sum = 0; | |
for (IntWritable val : values) { | |
sum += val.get(); | |
} | |
context.write(key, new IntWritable(sum)); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, "wordcount"); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setMapperClass(Map.class); | |
job.setReducerClass(Reduce.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.waitForCompletion(true); | |
} | |
} |
- La función Map: en la que a cada palabra se le asigna el valor 1 (one)
- La función Reduce: en la que se agrega dicho valor por cada palabra mapeada
- La función main: principal, que es la que controla la creación de la tarea (Job), establece los parámetros iniciales de lanzamiento y controla su estado.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ambari-qa@master-node ~]$ hadoop dfs -ls ./entrada | |
Found 2 items | |
drwx------ - ambari-qa hdfs 0 2013-05-23 18:41 /user/ambari-qa/entrada/entrada | |
-rw------- 3 ambari-qa hdfs 568233984 2013-05-23 20:31 /user/ambari-qa/entrada/input1.txt | |
[ambari-qa@master-node ~]$ hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount ./entrada/input1.txt ./salida | |
13/05/23 20:38:15 INFO input.FileInputFormat: Total input paths to process : 1 | |
13/05/23 20:38:15 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library | |
13/05/23 20:38:15 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev cf4e7cbf8ed0f0622504d008101c2729dc0c9ff3] | |
13/05/23 20:38:15 WARN snappy.LoadSnappy: Snappy native library is available | |
13/05/23 20:38:15 INFO util.NativeCodeLoader: Loaded the native-hadoop library | |
13/05/23 20:38:15 INFO snappy.LoadSnappy: Snappy native library loaded | |
13/05/23 20:38:15 INFO mapred.JobClient: Running job: job_201305232015_0004 | |
13/05/23 20:38:16 INFO mapred.JobClient: map 0% reduce 0% | |
13/05/23 20:38:29 INFO mapred.JobClient: map 20% reduce 0% | |
13/05/23 20:38:39 INFO mapred.JobClient: map 20% reduce 6% | |
13/05/23 20:39:46 INFO mapred.JobClient: map 21% reduce 6% | |
13/05/23 20:39:50 INFO mapred.JobClient: map 22% reduce 6% | |
13/05/23 20:39:54 INFO mapred.JobClient: map 23% reduce 6% | |
13/05/23 20:39:56 INFO mapred.JobClient: map 24% reduce 6% | |
13/05/23 20:39:57 INFO mapred.JobClient: map 26% reduce 6% | |
13/05/23 20:40:00 INFO mapred.JobClient: map 27% reduce 6% | |
13/05/23 20:40:03 INFO mapred.JobClient: map 31% reduce 6% | |
13/05/23 20:40:13 INFO mapred.JobClient: map 33% reduce 6% | |
13/05/23 20:40:15 INFO mapred.JobClient: map 34% reduce 6% | |
13/05/23 20:40:16 INFO mapred.JobClient: map 35% reduce 6% | |
13/05/23 20:40:18 INFO mapred.JobClient: map 40% reduce 6% | |
13/05/23 20:40:21 INFO mapred.JobClient: map 42% reduce 6% | |
13/05/23 20:40:28 INFO mapred.JobClient: map 44% reduce 6% | |
13/05/23 20:40:30 INFO mapred.JobClient: map 45% reduce 6% | |
13/05/23 20:40:31 INFO mapred.JobClient: map 46% reduce 6% | |
13/05/23 20:40:33 INFO mapred.JobClient: map 50% reduce 6% | |
13/05/23 20:40:36 INFO mapred.JobClient: map 52% reduce 6% | |
13/05/23 20:40:40 INFO mapred.JobClient: map 54% reduce 6% | |
13/05/23 20:40:43 INFO mapred.JobClient: map 55% reduce 6% | |
13/05/23 20:40:45 INFO mapred.JobClient: map 57% reduce 6% | |
13/05/23 20:40:46 INFO mapred.JobClient: map 58% reduce 6% | |
13/05/23 20:40:48 INFO mapred.JobClient: map 61% reduce 6% | |
13/05/23 20:40:49 INFO mapred.JobClient: map 62% reduce 6% | |
13/05/23 20:40:52 INFO mapred.JobClient: map 64% reduce 6% | |
13/05/23 20:40:55 INFO mapred.JobClient: map 65% reduce 6% | |
13/05/23 20:40:57 INFO mapred.JobClient: map 66% reduce 6% | |
13/05/23 20:40:58 INFO mapred.JobClient: map 68% reduce 6% | |
13/05/23 20:41:00 INFO mapred.JobClient: map 71% reduce 6% | |
13/05/23 20:41:01 INFO mapred.JobClient: map 72% reduce 6% | |
13/05/23 20:41:03 INFO mapred.JobClient: map 73% reduce 6% | |
13/05/23 20:41:04 INFO mapred.JobClient: map 74% reduce 6% | |
13/05/23 20:41:07 INFO mapred.JobClient: map 75% reduce 6% | |
13/05/23 20:41:11 INFO mapred.JobClient: map 76% reduce 6% | |
13/05/23 20:41:13 INFO mapred.JobClient: map 79% reduce 6% | |
13/05/23 20:41:14 INFO mapred.JobClient: map 80% reduce 6% | |
13/05/23 20:41:16 INFO mapred.JobClient: map 84% reduce 6% | |
13/05/23 20:41:17 INFO mapred.JobClient: map 85% reduce 6% | |
13/05/23 20:41:19 INFO mapred.JobClient: map 86% reduce 6% | |
13/05/23 20:41:25 INFO mapred.JobClient: map 88% reduce 6% | |
13/05/23 20:41:26 INFO mapred.JobClient: map 89% reduce 6% | |
13/05/23 20:41:28 INFO mapred.JobClient: map 91% reduce 6% | |
13/05/23 20:41:29 INFO mapred.JobClient: map 94% reduce 6% | |
13/05/23 20:41:32 INFO mapred.JobClient: map 95% reduce 6% | |
13/05/23 20:41:37 INFO mapred.JobClient: map 97% reduce 6% | |
13/05/23 20:41:38 INFO mapred.JobClient: map 98% reduce 6% | |
13/05/23 20:41:41 INFO mapred.JobClient: map 100% reduce 6% | |
13/05/23 20:41:43 INFO mapred.JobClient: map 100% reduce 13% | |
13/05/23 20:41:47 INFO mapred.JobClient: map 100% reduce 100% | |
13/05/23 20:41:48 INFO mapred.JobClient: Job complete: job_201305232015_0004 | |
13/05/23 20:41:48 INFO mapred.JobClient: Counters: 30 | |
13/05/23 20:41:48 INFO mapred.JobClient: Job Counters | |
13/05/23 20:41:48 INFO mapred.JobClient: Launched reduce tasks=1 | |
13/05/23 20:41:48 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=808592 | |
13/05/23 20:41:48 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 | |
13/05/23 20:41:48 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 | |
13/05/23 20:41:48 INFO mapred.JobClient: Rack-local map tasks=4 | |
13/05/23 20:41:48 INFO mapred.JobClient: Launched map tasks=5 | |
13/05/23 20:41:48 INFO mapred.JobClient: Data-local map tasks=1 | |
13/05/23 20:41:48 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=195476 | |
13/05/23 20:41:48 INFO mapred.JobClient: File Output Format Counters | |
13/05/23 20:41:48 INFO mapred.JobClient: Bytes Written=67079 | |
13/05/23 20:41:48 INFO mapred.JobClient: FileSystemCounters | |
13/05/23 20:41:48 INFO mapred.JobClient: FILE_BYTES_READ=2856148 | |
13/05/23 20:41:48 INFO mapred.JobClient: HDFS_BYTES_READ=568758942 | |
13/05/23 20:41:48 INFO mapred.JobClient: FILE_BYTES_WRITTEN=3649388 | |
13/05/23 20:41:48 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=67079 | |
13/05/23 20:41:48 INFO mapred.JobClient: File Input Format Counters | |
13/05/23 20:41:48 INFO mapred.JobClient: Bytes Read=568758272 | |
13/05/23 20:41:48 INFO mapred.JobClient: Map-Reduce Framework | |
13/05/23 20:41:48 INFO mapred.JobClient: Map output materialized bytes=428427 | |
13/05/23 20:41:48 INFO mapred.JobClient: Map input records=6433794 | |
13/05/23 20:41:48 INFO mapred.JobClient: Reduce shuffle bytes=428427 | |
13/05/23 20:41:48 INFO mapred.JobClient: Spilled Records=173561 | |
13/05/23 20:41:48 INFO mapred.JobClient: Map output bytes=871502402 | |
13/05/23 20:41:48 INFO mapred.JobClient: CPU time spent (ms)=172100 | |
13/05/23 20:41:48 INFO mapred.JobClient: Total committed heap usage (bytes)=2202488832 | |
13/05/23 20:41:48 INFO mapred.JobClient: Combine input records=76638558 | |
13/05/23 20:41:48 INFO mapred.JobClient: SPLIT_RAW_BYTES=670 | |
13/05/23 20:41:48 INFO mapred.JobClient: Reduce input records=22639 | |
13/05/23 20:41:48 INFO mapred.JobClient: Reduce input groups=3774 | |
13/05/23 20:41:48 INFO mapred.JobClient: Combine output records=143375 | |
13/05/23 20:41:48 INFO mapred.JobClient: Physical memory (bytes) snapshot=1995444224 | |
13/05/23 20:41:48 INFO mapred.JobClient: Reduce output records=3774 | |
13/05/23 20:41:48 INFO mapred.JobClient: Virtual memory (bytes) snapshot=7960113152 | |
13/05/23 20:41:48 INFO mapred.JobClient: Map output records=76517822 | |
[ambari-qa@master-node ~]$ hadoop dfs -ls ./salida | |
Found 1 items | |
-rw------- 3 ambari-qa hdfs 67079 2013-05-23 20:41 /user/ambari-qa/salida/part-r-00000 | |
[ambari-qa@master-node ~]$ hadoop dfs -cat ./salida/part-r-00000 > salida.txt | |
[ambari-qa@master-node ~]$ head -10 salida.txt | |
"pci=nocrs" 5910 | |
#0 10409 | |
#1 29277 | |
#2 10409 | |
#22 4499 | |
#3 10409 | |
#4 10409 | |
#5 10409 | |
#6 5910 | |
#7 5910 | |
[ambari-qa@master-node ~]$ |
De forma rápida tenemos acceso a información detallada sobre el trabajo en ejecución. Y si pulsamos sobre el job en concreto obtendremos un detalle aun mayor de cómo y cuando se han ido ejecutando las diferentes fases del trabajo:
Visualmente obtenemos información tan interesante como:
- El estado de ejecución: exitoso (SUCCESS) o erróneo (FAILED)
- La duración total del trabajo, casi 212 segundos
- 5 de los 6 nodos disponibles han estado implicados en el trabajo
- Detalles sobre el estado de ejecución de la tarea en el tiempo, como cuantos nodos han participado en las diferentes fases Map y Reduce.
En este caso, la ejecución completa del trabajo ha tardado 4 minutos y 35 segundos, es decir, 275 segundos. Comparativamente se ha obtenido una reducción del 30% en tiempo de ejecución gracias al uso del cluster.
Y esto es solo un ejemplo. Con tareas mejor diseñadas las reducciones podrían ser mas significativas. Y no olvidemos que el cluster, y su filosofía de trabajo paralelizada, nos permiten abordar problemas de cálculo con volúmenes de entrada muy grandes. En este ejemplo hemos usando un fichero con unas 6 millones de lineas. Pero imaginad que tuviera, por ejemplo, 10 o 100 veces más. El problema seguiría siendo resoluble en nuestro cluster Hadoop y sin embargo, por limitaciones de memoria, una sola máquina de forma secuencial dificilmente podría abordarlo.