Are SArrays thread safe for reads?

User 2032 | 9/21/2015, 2:29:50 PM

Hi there,

I need to load large SArrays of dicts into aerospike. Unfortunately their python client does not support batches or pipes so from a single thread I can load only 1 record at a time which results in prohibitively slow 300TPS.

I came up with something like the code below but I'm not sure if I have to create temporary slices of an SArray or can safely use direct SArray slices from multiple threads:

def chunk_sf(sf, chunk_size):
    max_ = len(sf)
    if not max_:
        return
    num_pages = divmod(max_, chunk_size)[0] + 1
    for page in xrange(num_pages):
        # inclusive start
        start = chunk_size * page
        # inclusive end
        end = chunk_size * (page + 1)
        yield (start, min(end, max_))

def insert_batch(config, sa, key_column, namespace, aerospike_set, clean_record):
        try:
            client = aerospike.client(config).connect()
            results = []
            log = logging.getLogger(__name__)
            for row in sa:
                key_tuple = (namespace, aerospike_set, row[key_column])

                if clean_record:
                    if client.exists(key_tuple):
                        client.remove(key_tuple)

                try:
                    client.put(key_tuple, row)
                except Exception as e:
                    pass
                    log.error("Aerospike insert failed on client.put({}, {})".format(key_tuple, row), exc_info=True)

            client.close()
            return results
        except Exception as e:
            client.close()
            raise e

mapped_rows = sf.apply(map_row) # gives an SArray of dicts

log.debug("Sending {} rows to aerospike".format(len(mapped_rows)))

with concurrent.futures.ThreadPoolExecutor(workers) as executor:
    for start, end in chunk_sf(mapped_rows, batch_size * workers):
        part = mapped_rows[start:end]
        futures = [
            executor.submit(
                insert_batch,
                worker_aerospike_config,
                [i for i in part[start:end]],  # <------ Line in question
                # part[start:end],  # <--------- Possible replacement?
	    mapper_to[key_column],
                namespace,
                aerospike_set,
                clean_record
            )
            for start, end
            in chunk_sf(part, min(batch_size, (len(part) / workers or len(part))))]

        concurrent.futures.wait(futures)

Comments

User 1178 | 9/21/2015, 3:41:17 PM

Hi Jonny,

GraphLab Create objects are usually immutable, so access this object from multiple thread should not be a problem.

Thanks! Ping


User 2032 | 9/21/2015, 3:43:44 PM

Hi Ping,

Thanks for the answer!:)

A followup question - are you planning to support SFrame uploads to low latency dbs such as redis-lab, aerospike, couchbase, riak?


User 1178 | 9/21/2015, 11:37:06 PM

Hi Johnny,

We currently do support taking data in/out of SFrame from ODBC data sources, but we do not have built-in support for specific databases. You may have to use database specific way of ingesting data.

What kind of usage scenario do you have in mind? Your usage case will help us prioritize our next set of features we are going to offer.

Thanks! Ping


User 2032 | 9/22/2015, 5:09:50 PM

We do feature creation in gl and need to put them in aerospike to be retrieved by our prediction algorithms. We need sub millisecond latencies in the retrieval which unfortunately graphlab prediction service cannot provide right now. So basically I have an sframe with few hundred thousands of rows that I need to put to aerospike every few minutes. This will soon be much more rows so the saving troughput will become a concern. I think that at C level graphlab could do this much more efficiently by leveraging the aerospike c client unfortunately we don't have the resources right now to code it ourselves but maybe you could give it a try. In general fast batch loading to popular fast access dB's should make graphlab a much more production ready resource. Also ODBC is a real pain if you have complex data types vide my post about corrupted json and array data.


User 15 | 9/22/2015, 6:36:40 PM

Hi Jan,

If you don't mind me asking, what performance are you getting from your parallel import solution to Aerospike?

After reading a bit about Aerospike, it doesn't appear that they even support batch inserts, and the APIs that do have it just call the single insert in a loop like you do. So it seems that even if we supported Aerospike's C client, it probably wouldn't get you any performance solely because of batch. Perhaps the client is more performant for other reasons, but that's purely a guess.

A next step might be to just do some microbenchmarks for inserting between the Python and the C client to see if it's even worth thinking about.


User 2032 | 9/23/2015, 8:49:54 AM

Hi Evan,

On localhost I was getting up to 30K TPS with ProcessPoolExecutor with 50 workers. With thread pool and a remote host the max I'm getting is c.a. 2K TPS. This is for records with c.a. 100 bins. If there were a batch method for aero I would definitely use it, I was even considering switching to couchbase because of the lack of it.

My hunch is that the way to approach it in graphlab is to have a large pool of (400?) threads with clients. In python this approach does not yield the best results and I presume this is because of GIL, but perhaps this would not be such a bad idea in C++. This is the aerospike recommended approach for batch loading with java clients. However the java client is async and the C one is not.


User 2299 | 9/23/2015, 5:07:54 PM

Hi JohnnyM, Brian from Aerospike here.

At this time, you're on the right track with faster loading in Aerospike - extra threads - which are a pain in Python. If you can push out to a file then use a different language ( like java ), you can use some of our loader tools that are parallel and fast. Pain, I know, I almost hate to mention it, but I can provide a pointer.

However --- we are currently adding batch writes to Aerospike --- and proper async capabilities across the board. I am looking right now at the guy implementing it. If you're interested in contacting me directly at brian@aerospike.com , I'd be happy to see if I can get you an early release, or give you some guidance for dates.

Thanks for using Aerospike ! We think Dato and Aerospike work well together.