[解決済み] format("kafka") で "Failed to find data source: kafka." とエラーになるのはなぜですか?(uber-jarを使用しても)失敗しますか?
質問内容
HDP-2.6.3.0とSpark2パッケージ2.2.0を使用しています。
Structured Streaming APIを使用してKafkaコンシューマを作成しようとしていますが、クラスターにジョブを送信した後、次のエラーが発生します。
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more
以下
spark-submit
コマンドを使用します。
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode client \
--class com.example.KafkaConsumer \
--executor-cores 2 \
--executor-memory 512m \
--driver-memory 512m \
sample-kafka-consumer-0.0.1-SNAPSHOT.jar
私のjavaコードです。
package com.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class KafkaConsumer {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("kafkaConsumerApp")
.getOrCreate();
Dataset<Row> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
.option("subscribe", "my-topic")
.load();
}
}
pom.xmlを使用します。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>sample-kafka-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>local-maven-repo</id>
<url>file:///${project.basedir}/local-maven-repo</url>
</repository>
</repositories>
<build>
<!-- Include resources folder in the .jar -->
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<!-- Plugin to compile the source. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Plugin to include all the dependencies in the .jar and set the main class. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
[アップデート】ユーバー・ジャー
以下は、uber-jar を生成するために pom.xml で使用される設定です。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
解決方法は?
kafka
データソースは
外部
モジュールで、Spark アプリケーションではデフォルトで使用できません。
として定義する必要があります。
pom.xml
(しかし、これはSparkアプリケーションで使用するための最初のステップに過ぎません。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
この依存関係で、いわゆる
ユーバージャー
を使うか (これはかなり大きな jar ファイルになり、サブミット時間が長くなります)、あるいは、すべての依存関係をバンドルしている
--packages
(または、より柔軟性の低い
--jars
で依存関係を追加するオプションがあります。
spark-submit
の時間になります。
(必要なjarをHadoop HDFSに保存したり、Hadoopディストリビューション固有の方法でSparkアプリケーションの依存関係を定義するなどのオプションもありますが、ここではシンプルに考えてみましょう)
を使うことをお勧めします。
--packages
まず最初に、それがうまくいったときにだけ、他の選択肢を検討してください。
使用する
spark-submit --packages
を含めることができます。
spark-sql-kafka-0-10
モジュールは以下のようになります。
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
その他のコマンドラインオプションはお好みで入れてください。
Uber-Jarのアプローチ
すべての依存関係を含む、いわゆる
ユーバージャー
を使用するため、必ずしもうまくいくとは限りません。
META-INF
ディレクトリが処理されます。
について
kafka
データソースが動作するように (他のデータソースも一般的に)
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
すべてのデータソースの
マージ
(ただし
replace
または
first
またはあなたが使うどんな戦略でも)。
kafka
データソースは、独自の
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
を登録することで
org.apache.spark.sql.kafka010.KafkaSourceProvider
のデータソースプロバイダとして
kafka
形式を使用します。
関連
-
[解決済み] スパーク "プランの文字列表現が大きすぎたため、切り捨てました。" 手動で作成した集計式を使用した場合の警告
-
[解決済み] PySparkのデータフレームで、各キーのパーセンタイルはどのように計算されますか?
-
[解決済み] Spark コンテキスト 'sc' が定義されていない
-
[解決済み] Apache SparkとAkkaの比較【終了しました
-
[解決済み] Spark: 2つのDataFrameを減算する
-
[解決済み] spark 2.4.4 をインストールした後に pyspark を実行しようとすると、「TypeError: an integer is required (got type bytes)」というエラーが発生するのを修正する方法
-
[解決済み] Sparkクラスタがハートビートのタイムアウトでいっぱいになり、エグゼキュータが勝手に終了してしまう。
-
[解決済み] pyspark : NameError: name 'spark' is not defined.
-
[解決済み] 実行中のSparkアプリケーションを終了させるには?
-
[解決済み】mapとflatMapの違いと、それぞれの良い使用例について教えてください。
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] スパーク "プランの文字列表現が大きすぎたため、切り捨てました。" 手動で作成した集計式を使用した場合の警告
-
[解決済み] Spark が "java.net.URISyntaxException" を報告するのはなぜですか?DataFrameを使用する際に「java.net.URIStyntaxException: Relative path in absolute URI」と表示されるのはなぜですか?
-
[解決済み] format("kafka") で "Failed to find data source: kafka." とエラーになるのはなぜですか?(uber-jarを使用しても)失敗しますか?
-
[解決済み] sparkでsaveAsTextFileするときのファイル名の付け方は?
-
[解決済み] pyspark : NameError: name 'spark' is not defined.
-
[解決済み] Sparkのバージョンを確認する方法【終了しました
-
[解決済み】mapとflatMapの違いと、それぞれの良い使用例について教えてください。
-
[解決済み】Spark Dataframeで列の内容をすべて表示するにはどうすればよいですか?
-
[解決済み】SparkコンソールにINFOメッセージを表示させないようにするには?
-
[解決済み] 複数のテキストファイルを1つのRDDに読み込むには?