Understanding SFrame, Spark's Dataframe (formerly SchemaRDD), and Adatao's DDF

User 1276 | 2/13/2015, 3:36:30 AM

I'd appreciate a (pointer to) discussion/comparison of these three methods to represent large structured data.

Thanks, Vu Ha CTO, Semantic Scholar http://www.quora.com/What-is-Semantic-Scholar-and-how-will-it-work

Comments

User 1189 | 3/17/2015, 11:46:58 PM

@vha14 The simplest difference is that SFrames are disk-backed whereas Spark dataframes are distributed in-memory. SFrames scale to the size of data that fits onto disk on a single machine (TBs easily). Spark Dataframes scale to the size of collective memory capacity on a cluster. We are working on distributed SFrames to allow people to seemly transition from single machine in-memory, to single machine on-disk, to distributed.

In terms of the query API, both Spark's DataFrame and Adatao's dff behave more like SQL operators. SFrames, on the other hand, are more like Pandas DataFrames, in that the SFrame is simply a named collection of columns. Columns (which we call SArrays) are first class objects, much like the numpy array is to Pandas Dataframes. This provides greater freedom in the API, like the ability to easily create new SFrames as a collection of SArrays of the same length from multiple different sources, perform vectorized math directly on arbitrary SArrays (very useful for people who like to think in Math), etc. SFrames are not confined to SQL-like operators. It has operators such as groupby-ArgMax, which are extremely useful, but are hard to express in SQL.

Finally, since SFrames are stored on disk, it essentially also serves as native storage (like ParquetFile). This means that the loading operation is virtually free--it just opens a pointer to a table on disk.

(There are other key API differences which are harder to describe without making this reply extremely long. The SFrames is the outcome of trying to design a datastructure first and foremost for data science needs. For instance, we have a column type called "dict" which can basically contain JSON. And we have an operator that unpacks that dictionary column into separate columns. This allows us to easily process arbitrary JSON data without a lot of work, and is something which can be tricky to express otherwise.)


User 1276 | 3/30/2015, 5:53:06 PM

This is a great answer! Thanks much Yucheng.


User 2053 | 6/14/2015, 9:53:56 PM

Somebody sent me a link to this post and asked me to reply, since he felt the existing answer was factually incorrect.

(1) Spark DataFrame is not "in-memory" only. It works perfectly fine against disk-based data, and can be used to process data much larger than the aggregate memory available.

(2) Spark DataFrame has an intermediate logical abstraction, which is more flexible than a design that ties directly to physical execution. That is to say, the physical operation is not tied to the logical operation (e.g. it can be used against in-memory row-based data, or disk-based column data). For example, technically I can have a version of the optimizer that generates Dato SFrame implementation based on Spark's DataFrame. Also the example you gave about groupby-argmax can be trivially supported. I can implement it tonight and add it to Spark 1.4.1 :)

(3) JSON data: there is native JSON data support in Spark DataFrames. Actually Spark users regularly use Spark SQL to load terabytes of JSON files, without even having to specify the schema because the system infers the schema automatically!


User 1189 | 6/15/2015, 9:27:36 PM

Hi Reynolds!

That message was from 3 months ago and I am sure things have gotten better.

1) As far as I understand the Spark DataFrame can point to on disk data (i.e. Hive etc) but since a degree of the query processing is performed in memory (parts of the scatter), depending on how much data I am pulling in from the disk-based database, and depending on the specifics of the operations I am running I can still run into issues.

2) The care that has to be taken is that the physical operations must be capable of representing the space of logical operations. For instance consider the hypothetical operation:

 a = load table from src 1 (it has 1000 rows)
 b = load table from src 2 (it has 1000 rows)
 a2 = a.filter(some condition 1)
 b2 = b.filter(some condition 2)
 # a2 and b2 coincidentally has the same number of rows say... 400 rows each)
 combine columns in (a2, b2) 
# i.e a new table with 400 rows but with both columns of a and columns of b. This is not a join.
# In a way it is with "add sequential row numbers to a2", "add sequential row numbers to b2", "join them and sort"

This operation is somewhat non-relational and messy/inefficient to express on a SQL relational backend. (at least I can't think of a nice way to represent this relationally. I am very much willing to be corrected on this.). I can think of ways to do so with creation of intermediate tables with implicit row numbers, but it is not so clear how to do this cleanly.

Also because "columns are first class" we allow arbitrary modifications of columns. (The SFrame behaves more like a dictionary of columns than a table). We find this much more natural. Like: normalizing a column:

sf['y'] = sf['y'] / sf['y'].sum()

Yes the first hypothetical operation is somewhat synthetic, but in a way, designing a logical abstraction to support multiple backends also means that the logical abstraction must contain only the minimum common denominator of all backends.

3) Direct inference can be done meaningfully only if the JSON schema is well defined. i.e. every row has a dictionary of the same keys/values. What we have instead is a CSV parser (that can read JSON quite well as it turns out), a native dictionary and list type (both which are "schema-free" and supports arbitrarily recursive types), and operators that let you but bulk manipulation on these types (unpacking dictionaries/lists, packing them, stacking them, etc). This is somewhat more flexible and allows us to handle some really really irregular JSON schemas we have encountered.

(One of the stranger ones is a JSON object where one of the string fields is itself a JSON escaped as string. This is easy to convert with what we have since sf['string_column'].astype(dict) performs a full "JSON-like" parse on each cell. (Really our parser handles a superset of JSON).

For now I think our SFrame has a somewhat richer API surface area (at least for interesting types), though your Dataframe have nicer connectors as well as distributed execution. In any case we have also added a query planner/optimizer and this allows us to look into retargetable backends too :-) So real execution architecture differences may start to fade over time.