Error when running groupByKey followed by saveAsTextFile

User 2600 | 3/16/2016, 5:18:48 PM

Hi,

This worked before, and I just tried it with versions 1.8.1 to 1.8.4, but for some reason this code:

`python conf = SparkConf().set('spark.executor.memory', '7g') sc = SparkContext('spark://<master-node>:<port>', conf=conf) rdd = sframe.tordd(sc, numberof_partitions=42) rdd = rdd.map(lambda d: (d['key'], d['dictofattributes'])

java.lang.Exception: Negative bytes read, error in reading SFrame

... but the directory for the parts of the rdd is created on HDFS ...

rdd.saveAsTextFile('hdfs://<namenode>:<port>/user/<me>/save/it/here/sframe.rdd') `

Is throwing this exception in the Spark worker logs (I set logging to DEBUG and grabbed a little bit extra just for context in case it is important):

` 23:37:44.133 [Executor task launch worker-0] INFO o.a.spark.broadcast.TorrentBroadcast - Reading broadcast variable 1 took 142 ms 23:37:44.206 [Executor task launch worker-0] INFO org.apache.spark.storage.MemoryStore - ensureFreeSpace(7280) called with curMem=4669, maxMem=3889724129 23:37:44.207 [Executor task launch worker-0] INFO org.apache.spark.storage.MemoryStore - Block broadcast1 stored as values in memory (estimated size 7.1 KB, free 3.6 GB) 23:37:44.207 [Executor task launch worker-0] DEBUG o.apache.spark.storage.BlockManager - Put block broadcast1 locally took 70 ms 23:37:44.207 [Executor task launch worker-0] DEBUG o.apache.spark.storage.BlockManager - Putting block broadcast1 without replication took 70 ms 23:38:01.262 [sparkExecutor-akka.actor.default-dispatcher-3] DEBUG o.a.s.r.a.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - [actor] received message AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@6da844ab),false) from Actor[akka://sparkExecutor/deadLetters] 23:38:01.262 [sparkExecutor-akka.actor.default-dispatcher-3] DEBUG o.a.s.r.a.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - Received RPC message: AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@6da844ab),false) 23:38:01.262 [sparkExecutor-akka.actor.default-dispatcher-3] INFO o.a.s.e.CoarseGrainedExecutorBackend - Got assigned task 48 23:38:01.263 [sparkExecutor-akka.actor.default-dispatcher-3] DEBUG o.a.s.r.a.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - [actor] handled message (0.615563 ms) AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@6da844ab),false) from Actor[akka://sparkExecutor/deadLetters] 23:38:01.263 [Executor task launch worker-1] INFO org.apache.spark.executor.Executor - Running task 26.1 in stage 1.0 (TID 48) 23:38:01.264 [Executor task launch worker-1] DEBUG org.apache.spark.executor.Executor - Task 48's epoch is 0 23:38:01.266 [Executor task launch worker-1] DEBUG o.apache.spark.storage.BlockManager - Getting local block broadcast1 23:38:01.267 [Executor task launch worker-1] DEBUG o.apache.spark.storage.BlockManager - Level for block broadcast1 is StorageLevel(true, true, false, true, 1) 23:38:01.268 [Executor task launch worker-1] DEBUG o.apache.spark.storage.BlockManager - Getting block broadcast1 from memory 23:38:12.049 [Executor task launch worker-0] ERROR o.a.spark.api.python.PythonRunner - Python worker exited unexpectedly (crashed) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/fs1/home/dbethune/lib/spark/spark-ucores/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main command = pickleSer.readwithlength(infile) File "/fs1/home/dbethune/lib/spark/spark-ucores/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in readwithlength length = readint(stream) File "/fs1/home/dbethune/lib/spark/spark-ucores/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 545, in readint raise EOFError EOFError

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) [spark-core_2.10-1.5.2.jar:1.5.2]
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) [spark-core_2.10-1.5.2.jar:1.5.2]
    aMarkdown`�I�M!	��7#	++����FYI: If you are using Anaconda and having problems with NumPyHello everyone,

I ran into an issue a few days ago and found out something that may be affecting many GraphLab users who use it with Anaconda on Windows. NumPy was unable to load, and consequently everything that requires it (Matplotlib etc).

It turns out that the current NumPy build (1.10.4) for Windows is problematic (more info here).

Possible workarounds are downgrading to build 1.10.1 or forcing an upgrade to 1.11.0 if your dependencies allow. Downgrading was easy for me using conda install numpy=1.10.1

Thanks for your attention!

RafaelMarkdown558,824,8414L���4L���179.110.206.156179.110.206.1564P�}��Xj�8\j�1str�"��\j�Xj��\j�8bj�րi�1(׀i��g��b�j����Xj�\j�Xj�8\j�1.hpp(decrementdistributedcounter:787): Distributed Aggregation of likelihood. 0 remaining. INFO: distributedaggregator.hpp(decrementdistributedcounter:793): Aggregate completion of likelihood Likelihood: -3.22336e+08 INFO: distributedaggregator.3HLABDISABLELAMBDA_SHM"] = "1" os.environ["GRAPHLABFORCEIPCTOTCP_FALLBACK"] = "1" import graphlab as gl

3. Test out your lambda worker code in this environment. If it works, then you can make the above configuration permanent by running:

gl.sys_util.write_config_file_value("GRAPHLAB_DISABLE_LAMBDA_SHM", "1")
gl.sys_util.write_config_file_value("GRAPHLAB_FORCE_IPC_TO_TCP_FALLBACK", "1")

Note that this can be undone by setting these to "0" instead of "1", or by editing the file given by gl.sys_util.get_config_file().

4. If the lambda workers do not work after trying step 1, then there are two things we would very much appreciate you do to help us track down the issue.

4.1. First, execute the following code in a clean python shell, where you have not yet imported graphlab create. At the end of this code, it prints out the path to a zip file that, if you could send it to us, will help us diagnose the issue. Please create a support tick^j�8bj�2�"��bj�^j��bj�8�j�րi�2(׀i����^j�8bj�

Comments

User 1190 | 3/17/2016, 7:53:43 PM