Поиск по сайту:

Пример Apache Spark: программа подсчета слов на Java


Апач Спарк

Apache Spark — это платформа обработки данных с открытым исходным кодом, которая может выполнять аналитические операции с большими данными в распределенной среде. Это был академический проект Калифорнийского университета в Беркли, и первоначально его начал Матей Захария из AMPLab Калифорнийского университета в Беркли в 2009 году. Apache Spark был создан на основе инструмента управления кластером, известного как Mesos. Позже он был изменен и обновлен, чтобы он мог работать в кластерной среде с распределенной обработкой.

Пример настройки проекта Apache Spark

Мы будем использовать Maven для создания примера проекта для демонстрации. Чтобы создать проект, выполните следующую команду в каталоге, который вы будете использовать в качестве рабочей области:

mvn archetype:generate -DgroupId=com.journaldev.sparkdemo -DartifactId=JD-Spark-WordCount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

Если вы запускаете maven в первый раз, для выполнения команды generate потребуется несколько секунд, потому что maven должен загрузить все необходимые плагины и артефакты, чтобы выполнить задачу генерации. Создав проект, не стесняйтесь открывать его в своей любимой среде IDE. Следующим шагом является добавление в проект соответствующих зависимостей Maven. Вот файл pom.xml с соответствующими зависимостями:

<dependencies>

    <!-- Import Spark -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>1.4.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.0.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.geekcap.javaworld.sparkexample.WordCount</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-dependency-plugin</artifactId>
            <executions>
                <execution>
                    <id>copy</id>
                    <phase>install</phase>
                    <goals>
                        <goal>copy-dependencies</goal>
                    </goals>
                    <configuration>
                        <outputDirectory>${project.build.directory}/lib</outputDirectory>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Поскольку это проект на основе maven, на самом деле нет необходимости устанавливать и настраивать Apache Spark на вашем компьютере. Когда мы запустим этот проект, будет запущен экземпляр среды выполнения Apache Spark, и после того, как программа завершит выполнение, она будет закрыта. Наконец, чтобы понять все JAR-файлы, которые добавляются в проект, когда мы добавляем эту зависимость, мы можем запустить простую команду Maven, которая позволяет нам увидеть полное дерево зависимостей для проекта, когда мы добавляем к нему некоторые зависимости. Вот команда, которую мы можем использовать:

mvn dependency:tree

Когда мы запустим эту команду, она покажет нам следующее дерево зависимостей:

shubham:JD-Spark-WordCount shubham$ mvn dependency:tree

[INFO] Scanning for projects...
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-jar-plugin is missing. @ line 41, column 21
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
[INFO]
[INFO] -------------------< com.journaldev:java-word-count >-------------------
[INFO] Building java-word-count 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ java-word-count ---
[INFO] com.journaldev:java-word-count:jar:1.0-SNAPSHOT
[INFO] +- org.apache.spark:spark-core_2.11:jar:1.4.0:compile
[INFO] |  +- com.twitter:chill_2.11:jar:0.5.0:compile
[INFO] |  |  \- com.esotericsoftware.kryo:kryo:jar:2.21:compile
[INFO] |  |     +- com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile
[INFO] |  |     +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |     \- org.objenesis:objenesis:jar:1.2:compile
[INFO] |  +- com.twitter:chill-java:jar:0.5.0:compile
[INFO] |  +- org.apache.hadoop:hadoop-client:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-common:jar:2.2.0:compile
[INFO] |  |  |  +- commons-cli:commons-cli:jar:1.2:compile
[INFO] |  |  |  +- org.apache.commons:commons-math:jar:2.1:compile
[INFO] |  |  |  +- xmlenc:xmlenc:jar:0.52:compile
[INFO] |  |  |  +- commons-io:commons-io:jar:2.1:compile
[INFO] |  |  |  +- commons-logging:commons-logging:jar:1.1.1:compile
[INFO] |  |  |  +- commons-lang:commons-lang:jar:2.5:compile
[INFO] |  |  |  +- commons-configuration:commons-configuration:jar:1.6:compile
[INFO] |  |  |  |  +- commons-collections:commons-collections:jar:3.2.1:compile
[INFO] |  |  |  |  +- commons-digester:commons-digester:jar:1.8:compile
[INFO] |  |  |  |  |  \- commons-beanutils:commons-beanutils:jar:1.7.0:compile
[INFO] |  |  |  |  \- commons-beanutils:commons-beanutils-core:jar:1.8.0:compile
[INFO] |  |  |  +- org.codehaus.jackson:jackson-core-asl:jar:1.8.8:compile
[INFO] |  |  |  +- org.apache.avro:avro:jar:1.7.4:compile
[INFO] |  |  |  +- com.google.protobuf:protobuf-java:jar:2.5.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-auth:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.commons:commons-compress:jar:1.4.1:compile
[INFO] |  |  |     \- org.tukaani:xz:jar:1.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-hdfs:jar:2.2.0:compile
[INFO] |  |  |  \- org.mortbay.jetty:jetty-util:jar:6.1.26:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-app:jar:2.2.0:compile
[INFO] |  |  |  +- org.apache.hadoop:hadoop-mapreduce-client-common:jar:2.2.0:compile
[INFO] |  |  |  |  +- org.apache.hadoop:hadoop-yarn-client:jar:2.2.0:compile
[INFO] |  |  |  |  |  +- com.google.inject:guice:jar:3.0:compile
[INFO] |  |  |  |  |  |  +- javax.inject:javax.inject:jar:1:compile
[INFO] |  |  |  |  |  |  \- aopalliance:aopalliance:jar:1.0:compile
[INFO] |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- com.sun.jersey.jersey-test-framework:jersey-test-framework-core:jar:1.9:compile
[INFO] |  |  |  |  |  |  |  +- javax.servlet:javax.servlet-api:jar:3.0.1:compile
[INFO] |  |  |  |  |  |  |  \- com.sun.jersey:jersey-client:jar:1.9:compile
[INFO] |  |  |  |  |  |  \- com.sun.jersey:jersey-grizzly2:jar:1.9:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-https:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-framework:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |     \- org.glassfish.gmbal:gmbal-api-only:jar:3.0.0-b023:compile
[INFO] |  |  |  |  |  |     |        \- org.glassfish.external:management-api:jar:3.0.0-b012:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-server:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     |  \- org.glassfish.grizzly:grizzly-rcm:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     +- org.glassfish.grizzly:grizzly-http-servlet:jar:2.1.2:compile
[INFO] |  |  |  |  |  |     \- org.glassfish:javax.servlet:jar:3.1:compile
[INFO] |  |  |  |  |  +- com.sun.jersey:jersey-json:jar:1.9:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jettison:jettison:jar:1.1:compile
[INFO] |  |  |  |  |  |  |  \- stax:stax-api:jar:1.0.1:compile
[INFO] |  |  |  |  |  |  +- com.sun.xml.bind:jaxb-impl:jar:2.2.3-1:compile
[INFO] |  |  |  |  |  |  |  \- javax.xml.bind:jaxb-api:jar:2.2.2:compile
[INFO] |  |  |  |  |  |  |     \- javax.activation:activation:jar:1.1:compile
[INFO] |  |  |  |  |  |  +- org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile
[INFO] |  |  |  |  |  |  \- org.codehaus.jackson:jackson-xc:jar:1.8.3:compile
[INFO] |  |  |  |  |  \- com.sun.jersey.contribs:jersey-guice:jar:1.9:compile
[INFO] |  |  |  |  \- org.apache.hadoop:hadoop-yarn-server-common:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-mapreduce-client-shuffle:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-yarn-api:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.2.0:compile
[INFO] |  |  |  \- org.apache.hadoop:hadoop-yarn-common:jar:2.2.0:compile
[INFO] |  |  +- org.apache.hadoop:hadoop-mapreduce-client-jobclient:jar:2.2.0:compile
[INFO] |  |  \- org.apache.hadoop:hadoop-annotations:jar:2.2.0:compile
[INFO] |  +- org.apache.spark:spark-launcher_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-common_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-network-shuffle_2.11:jar:1.4.0:compile
[INFO] |  +- org.apache.spark:spark-unsafe_2.11:jar:1.4.0:compile
[INFO] |  +- net.java.dev.jets3t:jets3t:jar:0.7.1:compile
[INFO] |  |  +- commons-codec:commons-codec:jar:1.3:compile
[INFO] |  |  \- commons-httpclient:commons-httpclient:jar:3.1:compile
[INFO] |  +- org.apache.curator:curator-recipes:jar:2.4.0:compile
[INFO] |  |  +- org.apache.curator:curator-framework:jar:2.4.0:compile
[INFO] |  |  |  \- org.apache.curator:curator-client:jar:2.4.0:compile
[INFO] |  |  +- org.apache.zookeeper:zookeeper:jar:3.4.5:compile
[INFO] |  |  |  \- jline:jline:jar:0.9.94:compile
[INFO] |  |  \- com.google.guava:guava:jar:14.0.1:compile
[INFO] |  +- org.eclipse.jetty.orbit:javax.servlet:jar:3.0.0.v201112011016:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- org.apache.commons:commons-math3:jar:3.4.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  +- org.slf4j:slf4j-api:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jul-to-slf4j:jar:1.7.10:compile
[INFO] |  +- org.slf4j:jcl-over-slf4j:jar:1.7.10:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  +- org.slf4j:slf4j-log4j12:jar:1.7.10:compile
[INFO] |  +- com.ning:compress-lzf:jar:1.0.3:compile
[INFO] |  +- org.xerial.snappy:snappy-java:jar:1.1.1.7:compile
[INFO] |  +- net.jpountz.lz4:lz4:jar:1.2.0:compile
[INFO] |  +- org.roaringbitmap:RoaringBitmap:jar:0.4.5:compile
[INFO] |  +- commons-net:commons-net:jar:2.2:compile
[INFO] |  +- org.spark-project.akka:akka-remote_2.11:jar:2.3.4-spark:compile
[INFO] |  |  +- org.spark-project.akka:akka-actor_2.11:jar:2.3.4-spark:compile
[INFO] |  |  |  \- com.typesafe:config:jar:1.2.1:compile
[INFO] |  |  +- io.netty:netty:jar:3.8.0.Final:compile
[INFO] |  |  +- org.spark-project.protobuf:protobuf-java:jar:2.5.0-spark:compile
[INFO] |  |  \- org.uncommons.maths:uncommons-maths:jar:1.2.2a:compile
[INFO] |  +- org.spark-project.akka:akka-slf4j_2.11:jar:2.3.4-spark:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.6:compile
[INFO] |  +- org.json4s:json4s-jackson_2.11:jar:3.2.10:compile
[INFO] |  |  \- org.json4s:json4s-core_2.11:jar:3.2.10:compile
[INFO] |  |     +- org.json4s:json4s-ast_2.11:jar:3.2.10:compile
[INFO] |  |     \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] |  |        \- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] |  |           +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] |  |           \- org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] |  +- com.sun.jersey:jersey-server:jar:1.9:compile
[INFO] |  |  \- asm:asm:jar:3.1:compile
[INFO] |  +- com.sun.jersey:jersey-core:jar:1.9:compile
[INFO] |  +- org.apache.mesos:mesos:jar:shaded-protobuf:0.21.1:compile
[INFO] |  +- io.netty:netty-all:jar:4.0.23.Final:compile
[INFO] |  +- com.clearspring.analytics:stream:jar:2.7.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-core:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-jvm:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-json:jar:3.1.0:compile
[INFO] |  +- io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.4.4:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.4.0:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.4.4:compile
[INFO] |  +- com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] |  |  \- com.thoughtworks.paranamer:paranamer:jar:2.6:compile
[INFO] |  +- org.apache.ivy:ivy:jar:2.4.0:compile
[INFO] |  +- oro:oro:jar:2.0.8:compile
[INFO] |  +- org.tachyonproject:tachyon-client:jar:0.6.4:compile
[INFO] |  |  \- org.tachyonproject:tachyon:jar:0.6.4:compile
[INFO] |  +- net.razorvine:pyrolite:jar:4.4:compile
[INFO] |  +- net.sf.py4j:py4j:jar:0.8.2.1:compile
[INFO] |  \- org.spark-project.spark:unused:jar:1.0.0:compile
[INFO] \- junit:junit:jar:4.11:test
[INFO]    \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.987 s
[INFO] Finished at: 2018-04-07T15:50:34+05:30
[INFO] ------------------------------------------------------------------------

Всего с двумя добавленными зависимостями Spark собрал все необходимые зависимости в проекте, который включает в себя зависимости Scala, а Apache Spark написан на самом Scala.

Создание входного файла

Поскольку мы собираемся создать программу Word Counter, мы создадим образец входного файла для нашего проекта в корневом каталоге нашего проекта с именем input.txt. Поместите в него любой контент, мы используем следующий текст:

Hello, my name is Shubham and I am author at JournalDev . JournalDev is a great website to ready
great lessons about Java, Big Data, Python and many more Programming languages.

Big Data lessons are difficult to find but at JournalDev , you can find some excellent
pieces of lessons written on Big Data.

Не стесняйтесь использовать любой текст в этом файле.

Структура проекта

Создание счетчика слов

Теперь мы готовы начать писать нашу программу. Когда вы начинаете работать с программами для работы с большими данными, импорт может создать много путаницы. Чтобы этого избежать, вот все импорты, которые мы будем использовать в нашем проекте:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

Далее, вот структура нашего класса, которую мы будем использовать:

package com.journaldev.sparkdemo;

...imports...

public class WordCounter {

    private static void wordCount(String fileName) {
        ...
    }

    public static void main(String[] args) {
        ...
    }
}

Вся логика будет находиться внутри метода wordCount. Мы начнем с определения объекта для класса SparkConf. Объект этого класса используется для установки различных параметров Spark в качестве пар ключ-значение для программы. Мы предоставляем только простые параметры:

SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

master указывает локальный, что означает, что эта программа должна подключаться к потоку Spark, работающему на localhost. Имя приложения — это просто способ предоставить Spark метаданные приложения. Теперь мы можем создать объект контекста Spark с помощью этого объекта конфигурации:

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

Spark рассматривает каждый ресурс, который он получает для обработки, как RDD (устойчивые распределенные наборы данных), что помогает ему организовать данные в структуру данных поиска, которая намного эффективнее для анализа. Теперь мы преобразуем входной файл в сам объект JavaRDD:

JavaRDD<String> inputFile = sparkContext.textFile(fileName);

Теперь мы будем использовать API Java 8 для обработки файла JavaRDD и разделения слов, содержащихся в файле, на отдельные слова:

JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

Опять же, мы используем метод Java 8 mapToPair(...) для подсчета слов и предоставления пары word, number, которую можно представить в качестве вывода:

JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

Теперь мы можем сохранить выходной файл как текстовый файл:

countData.saveAsTextFile("CountData");

Наконец, мы можем указать точку входа в нашу программу с помощью метода main():

public static void main(String[] args) {
    if (args.length == 0) {
        System.out.println("No files provided.");
        System.exit(0);
    }
    wordCount(args[0]);
}

Полный файл выглядит так:

package com.journaldev.sparkdemo;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class WordCounter {

    private static void wordCount(String fileName) {

        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JD Word Counter");

        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        JavaRDD<String> inputFile = sparkContext.textFile(fileName);

        JavaRDD<String> wordsFromFile = inputFile.flatMap(content -> Arrays.asList(content.split(" ")));

        JavaPairRDD countData = wordsFromFile.mapToPair(t -> new Tuple2(t, 1)).reduceByKey((x, y) -> (int) x + (int) y);

        countData.saveAsTextFile("CountData");
    }

    public static void main(String[] args) {

        if (args.length == 0) {
            System.out.println("No files provided.");
            System.exit(0);
        }

        wordCount(args[0]);
    }
}

Теперь мы перейдем к запуску этой программы с использованием самого Maven.

Запуск приложения

Чтобы запустить приложение, перейдите в корневой каталог программы и выполните следующую команду:

mvn exec:java -Dexec.mainClass=com.journaldev.sparkdemo.WordCounter -Dexec.args="input.txt"

Заключение

В этом уроке мы увидели, как мы можем использовать Apache Spark в проекте на основе Maven, чтобы создать простую, но эффективную программу счетчика Word. Прочтите другие сообщения о больших данных, чтобы получить более подробные сведения о доступных инструментах и платформах обработки больших данных.

Загрузите исходный код

Скачать проект Spark WordCounter: JD-Spark-WordCount