Crash when using unstack on SFrame

User 1933 | 5/13/2015, 7:41:05 PM

I have a ~178 million row SFrame consisting of 7 columns (column types: [int,int,int,str,int,float,list]), where the final column is a variable length list of dictionaries. Without going into too much detail, each row reflects user activity with a given webpage, and the list of dictionaries describes attributes of each page. My task is to process this frame by doing some filtering of users, generating a new column that is a transformation of the list column, and then to use unstack to generate a concatenated list of the values from the list column for each user (i.e. a list of lists). Note that the number of unique users here is ~300k.

My original approach was this:

stumble_counts = data.groupby('userid', gl.aggregate.COUNT)
stumble_counts = stumble_counts[stumble_counts['Count']>=min_stumbles]
data = data.join(stumble_counts,on='userid')
data['scored_tags'] = data.apply(tag_scorer)
data = data[['userid','scored_tags']].filter_by([[],None],column_name='scored_tags',exclude=True)
data = data.unstack('scored_tags','scored_tags_combined')

But this regularly would crash the server on the unstack bit. I tried chunking the data, but that didn't work either:

stumble_counts = data.groupby('userid', gl.aggregate.COUNT)
stumble_counts = stumble_counts[stumble_counts['Count']>=min_stumbles]
first_block = True   
blocksize=100000
idx=0
block_n = 0
while idx<N_users:
    current_block = data.join(stumble_counts[idx:(blocksize+idx)],on='userid')
    current_block['scored_tags'] = current_block.apply(tag_scorer)
    current_block = current_block[['userid','scored_tags']].filter_by([[],None],column_name='scored_tags',exclude=True)
    current_block = current_block.unstack('scored_tags','scored_tags_combined')
    if first_block:
        combined = current_block
        first_block = False
    else:
        combined = combined.append(current_block)
    idx += blocksize
    block_n += 1

The one approach that has worked has been to significantly downsample the size of the data, by looking at only a sample (100k-200k) of users, like so:

stumble_counts = data.groupby('userid', gl.aggregate.COUNT)
stumble_counts = stumble_counts[stumble_counts['Count']>=min_stumbles]
## using sample_size = 200000
sample_users = stumble_counts.sample(fraction=sample_size/float(N_users))
sample = data.join(sample_users,on='userid')
sample['scored_tags'] = sample.apply(tag_scorer)
cleaned = sample[['userid','scored_tags']].filter_by([[],None],column_name='scored_tags',exclude=True)
combined = cleaned.unstack('scored_tags','scored_tags_combined')

I'm driving blind here, and am not sure how to isolate the issue. In most cases, the crash seems to happen when trying to do the unstack. Any ideas on how to debug?

Comments

User 1190 | 5/13/2015, 7:57:31 PM

The issue with unstack is that if some key is extremely popular, you will end up with a huge list or dictionary for that key, which is probably why the memory limit is hit.

For instance, if you unstack the edge list of twitter follower graph, the unstacked jb's follower list will have 60+ million elements, while most of the rest have less than 10 elements.

The idea is to have a max_size for the unstacked object. We have not built that into the API surface of unstack, but is probably something we should do.


User 15 | 5/13/2015, 8:13:46 PM

What @"Jay Gu" is saying is a likely scenario.

As far as how to debug this further:

In your case, this is probably either an issue with running out of memory, or there's some data value that we don't like. From the clues in your post it definitely looks like memory, but you could just quickly confirm by watching the unity_server process's memory usage when running your code.

You could check the server log (the file path is printed out when the server starts) and see if it appears the server is crashing in the same part of the code. You may not be able to tell if it's unstack, but we would if you send the log to us. The logs from previous runs are most likely sitting in your /tmp directory already.

With the most likely large element issue Jay mentioned, try grouping by the element you are using to unstack and counting the number of each element. If one is far bigger than all the others, it's very likely this is the issue.

If this IS the issue, and one element is creating a dictionary that is too large for memory, I'm not sure if there's a suitable workaround. SFrames don't support working with datasets which have a single element in a single row that is larger than memory. If there are elements you don't need in the resulting dictionary you could weed those out (drop some columns before unstack)...anything to get the output dictionary smaller. You could also try to complete your task without using unstack. Sorry for these troubles.

Evan


User 1933 | 5/13/2015, 8:17:20 PM

Thanks for the quick responses, guys. I have an alternative approach I'm trying now. Will post with an update if it works.


User 1933 | 5/13/2015, 8:36:27 PM

Ok, new version is running. Fingers crossed that it works. Basically, instead of relying on unstack, I handle the grouping by user at the start. Thus the new code approach is more or less:

print "Grouping by user..."
grouped = data.groupby('userid',{'concat':gl.aggregate.CONCAT("imp_rating","tags")})
print "Applying tag scoring..."
grouped['scored_tags'] = grouped.apply(tag_scorer_new)
print "Dropping any NAs"
grouped = grouped.dropna()
bow = grouped['scored_tags']
userids = grouped['userid']
return bow,userids

Then I just modify by apply function to operate on the new 'concat' column. It saves me a bunch of steps of processing (I can handle all the filtering, etc. from within the distributed apply) and will hopefully let me sidestep the problems with the unstack.

Make sense?


User 1933 | 5/13/2015, 8:37:29 PM

Wait no. That just crashed the server, too...but maybe here I can help solve the issue by adjusting the number of lambda workers?


User 15 | 5/18/2015, 6:46:17 PM

Hi @jlorince,

Your new approach does look simpler, but using groupby-concat will have a similar issue if "imp_rating" or "tags" are unbounded in size AND/OR one user has way more activity than most. Decreasing the number of lambda workers could conceivably help, but the server may be crashing in the groupby, in which case it would not help. SFrame operations are lazy, so it can look like later operations are the ones that are failing when really it's failing earlier in the pipeline. You can always test this by calling SFrame.materialize() to see which exact step is failing. This will force all pending operations on the SFrame to execute.

I would still try to groupby userid and aggregate a simple count. If there is some set of users that have a really high number of activities, then try filtering those specific users out and run your code. If it completes successfully, then we know what we're dealing with here. Then you can evaluate if it's possible to do what you're trying to do without making a potentially very large row.

Hope this helps,

Evan


User 570 | 2/19/2016, 2:32:54 PM

Same problem here with unstack on Dato-Core open source.

When running unstack on large data set (~1B records) memory utilization grows quickly to 95% (server has 256Gb RAM) and stalls forever (not crashes though). I've filtered out keys with largest counts of unstack variable but didn't help. On smaller files works fine. Maybe something to do with configuration settings?


User 12 | 2/19/2016, 10:29:09 PM

Hi MindisZ, can you send me (brian at dato) the log file that's generated when the SFrame engine starts? I'll see if I can find some hints in there about what's going wrong.

Thanks, Brian