Пример Apache Spark: программа подсчета слов на Java
Апач Спарк
Apache Spark — это платформа обработки данных с открытым исходным кодом, которая может выполнять аналитические операции с большими данными в распределенной среде. Это был академический проект Калифорнийского университета в Беркли, и первоначально его начал Матей Захария из AMPLab Калифорнийского университета в Беркли в 2009 году. Apache Spark был создан на основе инструмента управления кластером, известного как Mesos. Позже он был изменен и обновлен, чтобы он мог работать в кластерной среде с распределенной обработкой.
Пример настройки проекта Apache Spark
Мы будем использовать Maven для создания примера проекта для демонстрации. Чтобы создать проект, выполните следующую команду в каталоге, который вы будете использовать в качестве рабочей области:
mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
Если вы запускаете maven в первый раз, для выполнения команды generate потребуется несколько секунд, потому что maven должен загрузить все необходимые плагины и артефакты, чтобы выполнить задачу генерации. Создав проект, не стесняйтесь открывать его в своей любимой среде IDE. Следующим шагом является добавление в проект соответствующих зависимостей Maven. Вот файл pom.xml
с соответствующими зависимостями:
<dependencies>
<!-- Import Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Поскольку это проект на основе maven, на самом деле нет необходимости устанавливать и настраивать Apache Spark на вашем компьютере. Когда мы запустим этот проект, будет запущен экземпляр среды выполнения Apache Spark, и после того, как программа завершит выполнение, она будет закрыта. Наконец, чтобы понять все JAR-файлы, которые добавляются в проект, когда мы добавляем эту зависимость, мы можем запустить простую команду Maven, которая позволяет нам увидеть полное дерево зависимостей для проекта, когда мы добавляем к нему некоторые зависимости. Вот команда, которую мы можем использовать:
mvn dependency:tree
Когда мы запустим эту команду, она покажет нам следующее дерево зависимостей:
shubham:JD-Spark-WordCount shubham$ mvn dependency:tree
[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] | +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] | | \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] | | +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] | | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] | | \- org.objenesis:objenesis:jar:1.2:compile
[INFO] | +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] | +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] | | | +- commons-cli:commons-cli:jar:1.2:compile
[INFO] | | | +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] | | | +- xmlenc:xmlenc:jar:0.52:compile
[INFO] | | | +- commons-io:commons-io:jar:2.1:compile
[INFO] | | | +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] | | | +- commons-lang:commons-lang:jar:2.5:compile
[INFO] | | | +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] | | | | +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] | | | | +- commons-digester:commons-digester:jar:1.8:compile
[INFO] | | | | | \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] | | | | \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] | | | +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] | | | +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] | | | +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] | | | \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] | | | \- org.tukaani:xz:jar:1.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] | | | \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] | | | +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] | | | | +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] | | | | | +- com.google.inject:guice:jar:3.0:compile
[INFO] | | | | | | +- javax.inject:javax.inject:jar:1:compile
[INFO] | | | | | | \- aopalliance:aopalliance:jar:1.0:compile
[INFO] | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] | | | | | | | +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] | | | | | | | \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] | | | | | | \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] | | | | | | | \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] | | | | | | | \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] | | | | | | +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] | | | | | | \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] | | | | | +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] | | | | | | +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] | | | | | | | \- stax:stax-api:jar:1.0.1:compile
[INFO] | | | | | | +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] | | | | | | | \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] | | | | | | | \- javax.activation:activation:jar:1.1:compile
[INFO] | | | | | | +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] | | | | | | \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] | | | | | \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] | | | | \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] | | | \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] | | +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] | | \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] | +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] | +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] | +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] | | +- commons-codec:commons-codec:jar:1.3:compile
[INFO] | | \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] | +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] | | +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] | | | \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] | | +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] | | | \- jline:jline:jar:0.9.94:compile
[INFO] | | \- com.google.guava:guava:jar:14.0.1:compile
[INFO] | +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] | +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] | +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] | +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] | +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] | +- log4j:log4j:jar:1.2.17:compile
[INFO] | +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] | +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] | +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] | +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] | +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] | +- commons-net:commons-net:jar:2.2:compile
[INFO] | +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] | | +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] | | | \- com.typesafe:config:jar:1.2.1:compile
[INFO] | | +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] | | +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] | | \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] | +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] | +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] | +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] | | \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] | | +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] | | \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] | | \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] | | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] | | \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] | +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] | | \- asm:asm:jar:3.1:compile
[INFO] | +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] | +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] | +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] | +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] | +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] | | \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] | +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] | | +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] | | \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] | +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] | +- oro:oro:jar:2.0.8:compile
[INFO] | +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] | | \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] | +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] | +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] | \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO] \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------
Всего с двумя добавленными зависимостями Spark собрал все необходимые зависимости в проекте, который включает в себя зависимости Scala, а Apache Spark написан на самом Scala.
Создание входного файла
Поскольку мы собираемся создать программу Word Counter, мы создадим образец входного файла для нашего проекта в корневом каталоге нашего проекта с именем input.txt. Поместите в него любой контент, мы используем следующий текст:
Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.
Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.
Не стесняйтесь использовать любой текст в этом файле.
Структура проекта
Создание счетчика слов
Теперь мы готовы начать писать нашу программу. Когда вы начинаете работать с программами для работы с большими данными, импорт может создать много путаницы. Чтобы этого избежать, вот все импорты, которые мы будем использовать в нашем проекте:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
Далее, вот структура нашего класса, которую мы будем использовать:
package com.journaldev.sparkdemo;
...imports...
public class WordCounter {
private static void wordCount(String fileName) {
...
}
public static void main(String[] args) {
...
}
}
Вся логика будет находиться внутри метода wordCount
. Мы начнем с определения объекта для класса SparkConf
. Объект этого класса используется для установки различных параметров Spark в качестве пар ключ-значение для программы. Мы предоставляем только простые параметры:
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
master
указывает локальный, что означает, что эта программа должна подключаться к потоку Spark, работающему на localhost
. Имя приложения — это просто способ предоставить Spark метаданные приложения. Теперь мы можем создать объект контекста Spark с помощью этого объекта конфигурации:
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
Spark рассматривает каждый ресурс, который он получает для обработки, как RDD (устойчивые распределенные наборы данных), что помогает ему организовать данные в структуру данных поиска, которая намного эффективнее для анализа. Теперь мы преобразуем входной файл в сам объект JavaRDD
:
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
Теперь мы будем использовать API Java 8 для обработки файла JavaRDD
и разделения слов, содержащихся в файле, на отдельные слова:
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
Опять же, мы используем метод Java 8 mapToPair(...)
для подсчета слов и предоставления пары word, number
, которую можно представить в качестве вывода:
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
Теперь мы можем сохранить выходной файл как текстовый файл:
countData.saveAsTextFile("CountData");
Наконец, мы можем указать точку входа в нашу программу с помощью метода main()
:
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
Полный файл выглядит так:
package com.journaldev.sparkdemo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class WordCounter {
private static void wordCount(String fileName) {
SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
JavaRDD<String> inputFile = sparkContext.textFile(fileName);
JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));
JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);
countData.saveAsTextFile("CountData");
}
public static void main(String[] args) {
if (args.length == 0) {
System.out.println("No files provided.");
System.exit(0);
}
wordCount(args[0]);
}
}
Теперь мы перейдем к запуску этой программы с использованием самого Maven.
Запуск приложения
Чтобы запустить приложение, перейдите в корневой каталог программы и выполните следующую команду:
mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"
Заключение
В этом уроке мы увидели, как мы можем использовать Apache Spark в проекте на основе Maven, чтобы создать простую, но эффективную программу счетчика Word. Прочтите другие сообщения о больших данных, чтобы получить более подробные сведения о доступных инструментах и платформах обработки больших данных.
Загрузите исходный код
Скачать проект Spark WordCounter: JD-Spark-WordCount