In diesem Tutorial lernen Sie, wie Sie Hadoop mit MapReduce-Beispielen verwenden. Die verwendeten Eingabedaten sind SalesJan2009.csv. Es enthält vertriebsbezogene Informationen wie Produktname, Preis, Zahlungsmodus, Stadt, Land des Kunden usw. Ziel ist es, die Anzahl der in jedem Land verkauften Produkte herauszufinden.
In diesem Tutorial lernen Sie:
- Erstes Hadoop MapReduce-Programm
- Erläuterung der SalesMapper-Klasse
- Erläuterung der SalesCountryReducer-Klasse
- Erläuterung der SalesCountryDriver-Klasse
Erstes Hadoop MapReduce-Programm
In diesem MapReduce-Tutorial erstellen wir jetzt unser erstes Java MapReduce-Programm:
Stellen Sie sicher, dass Sie Hadoop installiert haben. Bevor Sie mit dem eigentlichen Prozess beginnen, ändern Sie den Benutzer in 'hduser' (ID, die während der Hadoop-Konfiguration verwendet wird, Sie können zu der Benutzer-ID wechseln, die während Ihrer Hadoop-Programmierkonfiguration verwendet wird).
su - hduser_
Schritt 1)
Erstellen Sie ein neues Verzeichnis mit dem Namen MapReduceTutorial, wie im folgenden MapReduce-Beispiel gezeigt
sudo mkdir MapReduceTutorial
Berechtigungen erteilen
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Laden Sie hier Dateien herunter
Überprüfen Sie die Dateiberechtigungen aller dieser Dateien
und wenn 'Lese'-Berechtigungen fehlen, gewähren Sie die gleichen-
Schritt 2)
Exportieren Sie den Klassenpfad wie im folgenden Hadoop-Beispiel gezeigt
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Schritt 3)
Kompilieren Sie Java-Dateien (diese Dateien befinden sich im Verzeichnis Final-MapReduceHandsOn ). Die Klassendateien werden im Paketverzeichnis abgelegt
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Diese Warnung kann ignoriert werden.
Diese Kompilierung erstellt ein Verzeichnis in einem aktuellen Verzeichnis mit dem in der Java-Quelldatei angegebenen Paketnamen ( in unserem Fall dh SalesCountry ) und legt alle kompilierten Klassendateien darin ab.
Schritt 4)
Erstellen Sie eine neue Datei Manifest.txt
sudo gedit Manifest.txt
füge folgende Zeilen hinzu:
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver ist der Name der Hauptklasse. Bitte beachten Sie, dass Sie am Ende dieser Zeile die Eingabetaste drücken müssen.
Schritt 5)
Erstellen Sie eine Jar-Datei
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Überprüfen Sie, ob die JAR-Datei erstellt wurde
Schritt 6)
Starten Sie Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Schritt 7)
Kopieren Sie die Datei SalesJan2009.csv in ~ / inputMapReduce
Verwenden Sie nun den folgenden Befehl, um ~ / inputMapReduce nach HDFS zu kopieren .
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Wir können diese Warnung ignorieren.
Überprüfen Sie, ob eine Datei tatsächlich kopiert wurde oder nicht.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Schritt 8)
Führen Sie den MapReduce-Job aus
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Dadurch wird ein Ausgabeverzeichnis mit dem Namen mapreduce_output_sales unter HDFS erstellt. Der Inhalt dieses Verzeichnisses ist eine Datei mit Produktverkäufen pro Land.
Schritt 9)
Das Ergebnis kann über die Befehlsschnittstelle angezeigt werden als:
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Die Ergebnisse können auch über eine Weboberfläche angezeigt werden
Öffnen Sie r in einem Webbrowser.
Wählen Sie nun 'Dateisystem durchsuchen ' und navigieren Sie zu / mapreduce_output_sales
Öffnen Sie Teil-r-00000
Erläuterung der SalesMapper-Klasse
In diesem Abschnitt werden wir die Implementierung der SalesMapper- Klasse verstehen .
1. Wir beginnen mit der Angabe eines Paketnamens für unsere Klasse. SalesCountry ist ein Name unseres Pakets. Bitte beachten Sie, dass die Ausgabe der Kompilierung SalesMapper.class in ein Verzeichnis mit dem folgenden Paketnamen verschoben wird: SalesCountry .
Anschließend importieren wir Bibliothekspakete.
Der folgende Schnappschuss zeigt eine Implementierung der SalesMapper- Klasse.
Beispielcode Erläuterung:
1. SalesMapper-Klassendefinition-
öffentliche Klasse SalesMapper erweitert MapReduceBase implementiert Mapper
Jede Mapper-Klasse muss von der MapReduceBase- Klasse erweitert werden und die Mapper- Schnittstelle implementieren .
2. Definieren der 'Map'-Funktion-
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Der Hauptteil der Mapper-Klasse ist eine 'map ()' - Methode, die vier Argumente akzeptiert.
Bei jedem Aufruf der Methode 'map ()' wird ein Schlüssel-Wert- Paar ( 'Schlüssel' und 'Wert' in diesem Code) übergeben.
Die Methode 'map ()' beginnt mit der Aufteilung des Eingabetextes, der als Argument empfangen wird. Es verwendet den Tokenizer, um diese Zeilen in Wörter aufzuteilen.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Hier wird ',' als Trennzeichen verwendet.
Danach wird ein Paar unter Verwendung eines Datensatzes am 7. Index des Arrays 'SingleCountryData' und eines Werts '1' gebildet .
output.collect (neuer Text (SingleCountryData [7]), eins);
Wir entscheiden Rekord bei der 7. Index , weil wir brauchen Länderdaten und es ist an der 7. Index in Array befindet ‚SingleCountryData‘ .
Bitte beachten Sie, dass unsere Eingangsdaten in das unten stehende Format ist (wo Land auf 7 th Index, mit 0 als Ausgangsindex) -
Transaktionsdatum, Produkt, Preis, Zahlungstyp, Name, Stadt, Bundesland, Land , Account_Created, Last_Login, Latitude, Longitude
Eine Ausgabe von Mapper ist wieder ein Schlüssel-Wert- Paar, das mit der Methode 'collect ()' von 'OutputCollector' ausgegeben wird .
Erläuterung der SalesCountryReducer-Klasse
In diesem Abschnitt werden wir die Implementierung der SalesCountryReducer- Klasse verstehen .
1. Wir beginnen mit der Angabe eines Paketnamens für unsere Klasse. SalesCountry ist ein Name für unser Paket. Bitte beachten Sie, dass die Ausgabe der Kompilierung SalesCountryReducer.class in ein Verzeichnis mit dem folgenden Paketnamen verschoben wird: SalesCountry .
Anschließend importieren wir Bibliothekspakete.
Der folgende Schnappschuss zeigt eine Implementierung der SalesCountryReducer- Klasse.
Code Erläuterung:
1. SalesCountryReducer-Klassendefinition-
öffentliche Klasse SalesCountryReducer erweitert MapReduceBase implementiert Reducer
Hier sind die ersten beiden Datentypen 'Text' und 'IntWritable' der Datentyp des Eingabeschlüsselwerts für den Reduzierer.
Die Ausgabe des Mapper erfolgt in Form von
Die letzten beiden Datentypen 'Text' und 'IntWritable' sind Datentypen der Ausgabe, die vom Reduzierer in Form eines Schlüssel-Wert-Paares generiert werden.
Jede Reducer-Klasse muss von der MapReduceBase- Klasse erweitert werden und die Reducer- Schnittstelle implementieren .
2. Definieren Sie die Funktion "Reduzieren".
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Eine Eingabe in die redu () -Methode ist ein Schlüssel mit einer Liste mehrerer Werte.
In unserem Fall wird es zum Beispiel sein
Dies wird dem Reduzierer als
Um Argumente dieser Form zu akzeptieren, werden die ersten beiden Datentypen verwendet, nämlich Text und Iterator
Das nächste Argument ist vom Typ OutputCollector
Die Methode redu () beginnt mit dem Kopieren des Schlüsselwerts und dem Initialisieren der Frequenzzählung auf 0.
Textschlüssel = t_key; int frequenzForCountry = 0;
Anschließend durchlaufen wir mithilfe der ' while'- Schleife die Liste der mit dem Schlüssel verknüpften Werte und berechnen die endgültige Häufigkeit durch Summieren aller Werte.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Nun schieben wir das Ergebnis in Form eines Schlüssels an den Ausgangskollektor und erhalten die erhaltene Frequenzzählung .
Der folgende Code macht dies-
output.collect(key, new IntWritable(frequencyForCountry));
Erläuterung der SalesCountryDriver-Klasse
In diesem Abschnitt werden wir die Implementierung der SalesCountryDriver- Klasse verstehen
1. Wir beginnen mit der Angabe eines Paketnamens für unsere Klasse. SalesCountry ist ein Name für unser Paket. Bitte beachten Sie, dass die Ausgabe der Kompilierung SalesCountryDriver.class in ein Verzeichnis mit dem folgenden Paketnamen verschoben wird: SalesCountry .
Hier ist eine Zeile, in der der Paketname gefolgt von Code zum Importieren von Bibliothekspaketen angegeben wird.
2. Definieren Sie eine Treiberklasse, die einen neuen Clientjob und ein neues Konfigurationsobjekt erstellt und Mapper- und Reducer-Klassen ankündigt.
Die Treiberklasse ist dafür verantwortlich, dass unser MapReduce-Job in Hadoop ausgeführt wird. In dieser Klasse geben wir den Jobnamen , den Datentyp der Eingabe / Ausgabe und die Namen der Mapper- und Reducer-Klassen an .
3. Im folgenden Codeausschnitt legen wir Eingabe- und Ausgabeverzeichnisse fest, die zum Konsumieren des Eingabedatensatzes bzw. zum Erzeugen der Ausgabe verwendet werden.
arg [0] und arg [1] sind die Befehlszeilenargumente, die mit einem Befehl übergeben werden, der in MapReduce zum Anfassen angegeben ist, d. h.
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Lösen Sie unseren Job aus
Unterhalb des Codes wird die Ausführung des MapReduce-Jobs gestartet.
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}