apache-spark - 如何在Spark流媒体应用程序中处理DynamoDB Stream

原文 标签 apache-spark amazon-dynamodb amazon-kinesis

How to process DynamoDB Stream in a Spark streaming application

I would like to consume a DynamoDB Stream from a Spark Streaming application.

Spark streaming uses KCL to read from Kinesis. There is a lib to make KCL able to read from a DynamoDB Stream: dynamodb-streams-kinesis-adapter.

But is it possible to plug this lib into spark? Anyone done this?

I'm using Spark 2.1.0.

My backup plan is to have another app reading from DynamoDB stream into a Kinesis stream.

Thanks

Answer

The way to do this it to implement the KinesisInputDStream to use the worker provided by dynamodb-streams-kinesis-adapter The official guidelines suggest something like this:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

From the Spark's perspective, it is implemented under the kinesis-asl module in KinesisInputDStream.scala

I have tried this for Spark 2.4.0. Here is my repo. It needs little refining but gets the work done

https://github.com/ravi72munde/spark-dynamo-stream-asl

After modifying the KinesisInputDStream, we can use it as shown below. val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()

翻译

我想从Spark Streaming应用程序中使用DynamoDB流。

Spark流使用KCL读取Kinesis。有一个使KCL能够从DynamoDB流读取的库:dynamodb-streams-kinesis-adapter。

但是有可能将此lib插入spark吗?有人这样做吗?

我正在使用Spark 2.1.0。

我的备份计划是让另一个应用程序从DynamoDB流读取到Kinesis流中。

谢谢
最佳答案
实现它的方法是实现KinesisInputDStream以使用dynamodb-streams-kinesis-adapter提供的工作程序
official guidelines建议如下:

final Worker worker = StreamsWorkerFactory .createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);

从Spark的角度来看,它是在KinesisInputDStream.scala中的kinesis-asl模块下实现的

我已经为Spark 2.4.0尝试过这个。这是我的仓库。它几乎不需要精炼,但可以完成工作

https://github.com/ravi72munde/spark-dynamo-stream-asl

修改KinesisInputDStream之后,我们可以如下所示使用它。
val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName("sample-tablename-2") .regionName("us-east-1") .initialPosition(new Latest()) .checkpointAppName("sample-app") .checkpointInterval(Milliseconds(100)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
相关推荐

python - 在Spark中广播用户定义的类

python - 天真的安装PySpark还支持S3访问

java - 如何将Dataset <Tuple2 <String,DeviceData >>转换为Iterator <DeviceData>

apache-spark - Spark结构化流写入实木复合地板会创建许多文件

scala - 如何重组代码以避免警告:“通过创建2元组来适应参数列表”

apache-spark - 访问Spark Mllib二等分K均值树数据

python - 从PySpark中的工作程序节点访问ADLS上二进制文件的最有效方法?

apache-spark - 为什么在本地模式下加入Spark太慢了?

scala - 如何在Spark中为不同的文件名调用单独的逻辑

apache-spark - Spark和InfiniBand