Parallel tasks

User 912 | 1/26/2015, 2:15:20 PM

Hello,

I have a question about running parallel tasks with the call "graphlab.deploy.parallelforeach()". Every time I run it, I get a long stacktrace with an error in the end:

" File "cygraph.pyx", line 28, in graphlab.cython.cygraph.UnityGraphProxy.cinit TypeError: cinit() takes at least 1 positional argument (0 given) "

Here's my code for the task and job:

temptask - graphlab.deploy.Task("Test task") temptask.setcode(findPaths) temptask.set_params(['graph', 'start'])

params = [{'graph': graph78, 'start': 47}, {'graph': graph78, 'start': 63}] #where graph78 is an already stored SGraph and 'start' is a vertex id

tempjob = graphlab.deploy.parallelforeach(temptask, params)

And the functions:

def findPaths(task): graph = task.params['graph'] start = task.params['start'] listId = findPathsIds(graph, start) ....

def findPathsIds(graph,start, degree=5, path=[]): ....

Basically, I have code for finding all paths from a specific node in graph up to specific length and I want to run tasks in parallel to find all paths from all nodes much faster. For now I just wanted to test the parallel tasks by calling tasks only on two of the nodes. The thing is that I can't understand from the stack trace whether I am not initializing the task and job correctly, can you see anything wrong there? Or maybe the problem is that I am calling a function inside the function, which each task will run? I am not sure whether the second function has to take "task" again as an argument?

I apologize in advance, I am a beginner with creating tasks and jobs and I may still be a bit confused how they work.

Comments

User 398 | 1/26/2015, 7:20:31 PM

Hi there,

Thanks for using GraphLab Create. Can you send me the code for your findPathsIds function? It will help me diagnose the issue here.

Best, Robert


User 912 | 1/26/2015, 7:39:19 PM

Hi Robert,

Here is the code for the two functions I am using:

<pre class="CodeBlock"><code>def findPathsIds(graph, start, degree=5, path=[]): path = path + [start]

if degree == 1:
	return [path]

paths = []

edges = graph.get_edges()

for e in edges:

	if(e['__src_id'] == start and e['__dst_id'] != start ):
		idn = e['__dst_id']

		if path not in paths:
			paths += [path] 

		if idn not in  path:
			newpaths = findPathsIds(graph, idn, degree-1, path)

			for newp in newpaths:
				paths += [newp]

return paths

def findPaths(task):

graph = task.params['graph']
start = task.params['start']

listId = findPathsIds(graph, start)

v = graph.get_vertices()
paths = dict()

for p in listId:
	path = ""

	for idn in p:
			path += v[v['__id'] == idn]['elem'][0] 

	if paths.has_key(path):
		paths[path] += 1
	else:
		paths[path] = 1

print paths

</code></pre>

Thank you for the help!


User 398 | 1/26/2015, 9:40:01 PM

The problem is that you're referring to SGraph instances in your params. We don't support that at the moment (but will in our next release). As a workaround, you can save your graphs and then pass in the paths in your params dictionaries (rather than references to SGraph instances). Your can refer to SGraph files saved locally or in S3. For example:

<pre class="CodeBlock"><code> params = [{'graph': '/data/graph1', 'start': 47}, {'graph': '/data/graph2', 'start': 63}] </code></pre>

And then you would need to slightly modify your function to load the SGraph from the specified file:

<pre class="CodeBlock"><code>import graphlab as gl

def findPaths(task):

graph = gl.load_sgraph(task.params['graph']) start = task.params['start']

listId = findPathsIds(graph, start)

v = graph.get_vertices() paths = dict()

for p in listId: path = ""

  for idn in p:
          path += v[v['__id'] == idn]['elem'][0] 

  if paths.has_key(path):
      paths[path] += 1
  else:
      paths[path] = 1

print paths </code></pre>


User 912 | 1/27/2015, 8:04:13 PM

Thank you so very much! That helped.

Just one last question.. The functions that my tasks run are usually in python files in the current directory I work in, so when I add the code to the task in a terminal, it usually looks like this:

import functions ... temptask.setcode(functions.findPaths)

If I run a single task, it works. However, when executing a job with that task, the job doesn't recognize the 'functions' file - it gives me an exception: No module named functions. Do you know how I can fix this?

Thank you once again for the help so far.


User 398 | 1/27/2015, 10:28:50 PM

You can actually specify a src filename parameter with the set_code method. You should be able to do something like this:

<code class="CodeInline">temptask.setcode("functions.py")</code>

At which point it will load functions from that module along w/ any other Python files in that same directory. Give this a shot and let me know if it works.


User 912 | 1/28/2015, 6:04:38 PM

Yes, it works! I just had to rename the main function for the task in the module to "run", so that the task will recognize it, and it finally worked! It even works with parallel tasks, which was my main focus..

Thank you once again for the time and help, this was really important to me!

Have a nice day, Vyara