python - 在Spark中广播用户定义的类

原文 标签 python apache-spark pyspark

Broadcast a user defined class in Spark

I am trying to broadcast a user defined variable in a PySpark application but I always have the following error:

 File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/.../sparkbroad.py", line 29, in <lambda>
    output = input_.map(lambda item: b.value.map(item))
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 97, in load
    return pickle.load(f)

AttributeError: 'module' object has no attribute 'FooMap'

The code, in the module sparkbrad.py is the following:

import random
import pyspark as spark

class FooMap(object):

    def __init__(self):
        keys = list(range(10))
        values = [2 * key for key in keys]
        self._map = dict(zip(keys, values))

    def map(self, value):
        if value not in self._map:
            return -1
        return self._map[value]


class FooMapJob(object):

    def __init__(self, inputs):
        self._inputs = inputs
        self._foomap = FooMap()

    def run(self):
        sc = spark.SparkContext('local', 'FooMap')
        input_ = sc.parallelize(self._inputs, 4)
        b = sc.broadcast(self._foomap)
        output = input_.map(lambda item: b.value.map(item))
        b.unpersist()
        result = list(output.toLocalIterator())
        sc.stop()
        return result


def main():
    inputs = [random.randint(0, 10) for _ in range(10)]
    job = FooMapJob(inputs)
    print(job.run())

if __name__ == '__main__':
    main()

and I am running it via the:

:~$ spark-submit --master local[4] --py-files sparkbroad.py sparkbroad.py

where I have added the --py-files argument but it looks it doesn't change that much. Unfortunately, I could not find any example online dealing with broadcasting of complex classes (just lists or dictionaries). Any hint is appreciated. Thanks in advance.

UPDATE: placing the FooMap class in a separate module, everything seems working fine, even without the --py-files directive.

Answer

Placing the FooMap class in a separate module, everything works fine.

翻译

我正在尝试在PySpark应用程序中广播用户定义的变量,但始终出现以下错误:

 File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/.../sparkbroad.py", line 29, in <lambda>
    output = input_.map(lambda item: b.value.map(item))
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 106, in value
    self._value = self.load(self._path)
  File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 97, in load
    return pickle.load(f)

AttributeError: 'module' object has no attribute 'FooMap'


模块sparkbrad.py中的代码如下:

import random
import pyspark as spark

class FooMap(object):

    def __init__(self):
        keys = list(range(10))
        values = [2 * key for key in keys]
        self._map = dict(zip(keys, values))

    def map(self, value):
        if value not in self._map:
            return -1
        return self._map[value]


class FooMapJob(object):

    def __init__(self, inputs):
        self._inputs = inputs
        self._foomap = FooMap()

    def run(self):
        sc = spark.SparkContext('local', 'FooMap')
        input_ = sc.parallelize(self._inputs, 4)
        b = sc.broadcast(self._foomap)
        output = input_.map(lambda item: b.value.map(item))
        b.unpersist()
        result = list(output.toLocalIterator())
        sc.stop()
        return result


def main():
    inputs = [random.randint(0, 10) for _ in range(10)]
    job = FooMapJob(inputs)
    print(job.run())

if __name__ == '__main__':
    main()


我通过以下方式运行它:

:~$ spark-submit --master local[4] --py-files sparkbroad.py sparkbroad.py


我在其中添加了--py-files参数的地方,但看起来并没有太大变化。不幸的是,我找不到在线处理复杂类(只是列表或词典)广播的任何示例。任何提示表示赞赏。提前致谢。

更新:将FooMap类放在单独的模块中,即使没有--py-files指令,一切也都可以正常工作。
最佳答案
FooMap类放在单独的模块中,一切正常。
相关推荐

python - 目标WSGI脚本'/ opt / python / current / app /…/wsgi.py'无法作为Python模块加载

python - TypeError:zip参数2必须支持迭代

python - Keras:以数组作为输入进行训练

python - 是否将鼠标悬停工具标签添加到Bokeh中的Spans?

python - 将Python REST API调用转换为Power Query

python - 如何在自定义python 3类中支持漂亮打印?

javascript - 如何检查我的Bokeh Server应用程序是否已完全加载并呈现?

python - 天真的安装PySpark还支持S3访问

python - 熊猫Groupby累计金额

python - 多层bidirectional_dynamic_rnn:与MultiRNNCell不兼容?