python - 天真的安装PySpark还支持S3访问

原文 标签 python amazon-web-services apache-spark amazon-s3 pyspark

Naive install of PySpark to also support S3 access

I would like to read Parquet data stored on S3 from PySpark.

I've downloaded spark from here:

http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz

And installed it to Python naively

cd python
python setup.py install

This seems to function fine and I can import pyspark, make a SparkContext, etc.. However when I go to read some publicly accessible parquet data I get the following:

import pyspark
sc = pyspark.SparkContext('local[4]')
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3://bucket-name/mydata.parquet')

And I receive the following exception

Py4JJavaError: An error occurred while calling o55.parquet.
: java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

This error pops up a bit from google searches. So far none of the solutions provided have been helpful.

I'm on Linux (Ubuntu 16.04) on a personal computer without much else installed (everything is pretty stock).

Update

I downgraded to http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz to have AWS included by default.

Now unfortunately my AWS credentials aren't being picked up. I've tried a few things:

  1. Including them as SparkConf parameters

    conf = (pyspark.SparkConf()
                   .set('fs.s3.awsAccessKeyId', ...')
                   .set('fs.s3.awsSecretAccessKey', '...'))
    sc = pyspark.SparkContext('local[4]', conf=conf)
    
  2. Including them in my local .aws/credentials file
  3. Including them in the URL (doesn't work because my access key has a forward slash)

Unfortunately in all cases I receive a traceback like the following

IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).'
Answer

Using the Hadoop-2.4 build of the pre-built spark 2.X binary (which I believe ships with s3 functionality) you can programmatically configure spark to pull s3 data in the following manner:

import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext('local[4]', conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")

sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3n://bucket-name/mydata.parquet')

A critical thing to note is the prefix s3n in both the URI for the bucket and the configuration name

翻译

我想从PySpark读取存储在S3上的Parquet数据。

我已经从这里下载了spark:

http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz


并天真地将其安装到Python

cd python
python setup.py install


这似乎运行良好,我可以导入pyspark,创建SparkContext等。但是,当我阅读一些可公开访问的镶木地板数据时,会得到以下信息:

import pyspark
sc = pyspark.SparkContext('local[4]')
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3://bucket-name/mydata.parquet')


我收到以下异常

Py4JJavaError: An error occurred while calling o55.parquet.
: java.io.IOException: No FileSystem for scheme: s3
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)


该错误从Google搜索中弹出。到目前为止,所提供的解决方案均无济于事。

我在个人计算机上的Linux(Ubuntu 16.04)操作系统上,未安装其他软件(所有产品都足够库存)。

更新资料

我降级为http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz以默认包含AWS。

现在很不幸,我的AWS凭证没有被领取。我已经尝试了几件事:


将它们包括为SparkConf参数

conf = (pyspark.SparkConf()
               .set('fs.s3.awsAccessKeyId', ...')
               .set('fs.s3.awsSecretAccessKey', '...'))
sc = pyspark.SparkContext('local[4]', conf=conf)

将它们包括在我的本地.aws / credentials文件中
在URL中包括它们(无效,因为我的访问密钥带有正斜杠)


不幸的是,在所有情况下,我都会收到如下所示的回溯

IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).'
最佳答案
使用预先构建的spark 2.X二进制文件的Hadoop-2.4构建(我相信它具有s3功能),您可以通过编程方式配置spark以以下方式提取s3数据:

import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext('local[4]', conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")

sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3n://bucket-name/mydata.parquet')


需要注意的关键是存储区URI和配置名称中的前缀s3n
相关推荐

python - 熊猫Groupby累计金额

python - 多层bidirectional_dynamic_rnn:与MultiRNNCell不兼容?

python - Pandas-提取基本python float的值

python - Python-获取城市人口的名称

python - 如何使用modelviewset和POST请求创建对象?

python - 魔术模拟assert_call_once vs assert_call_once_具有奇怪的行为

python - 熊猫:合并确切的ID和最近的日期

python - 缺少数据的熊猫groupby操作

python - 无法让django-star-ratings显示到模板

python - Config Kivy>反转y轴输入