Run a function in a new process

This is the place for queries that don't fit in any of the other categories.

Run a function in a new process

Postby gplayersv » Mon May 19, 2014 1:59 pm

Hello,

I have a function which does some stuff including:
  • adds paths to sys.path
  • sets environment variables with os.environ
Several instances of this function will run at the same time (web.py + AJAX) and they can interfere with one another as they add different things to sys.path and set the same environment variables with different values. In order to solve this issue, I think I have to run this function in a new process or somehow, so each instance of this function only has access to the sys.path/os.environ it sets.
Is this the right way to do it? Any thoughts/suggestions are welcome.

I should also mention that the function returns something and I'm interested in that data as I need it going forward.

Thanks!
User avatar
gplayersv
 
Posts: 16
Joined: Mon Apr 15, 2013 10:56 am

Re: Run a function in a new process

Postby 7stud » Mon May 19, 2014 7:11 pm

It seems pretty straightforward: In your GET or POST function, you can use the subprocess module to spawn a new process to do your work and read the result.
7stud
 
Posts: 106
Joined: Wed Apr 02, 2014 2:36 am

Re: Run a function in a new process

Postby gplayersv » Tue May 20, 2014 9:26 am

7stud wrote:It seems pretty straightforward: In your GET or POST function, you can use the subprocess module to spawn a new process to do your work and read the result.

I don't know how to run a function with the subprocess module, not even sure that's possible. If it's possible I'd be happy to learn.

I managed to do this with the multiprocessing module. It looks something like this:
Code: Select all
from multiprocessing import Process, Queue

def check_ping(q, device):
    _add_tp_to_path()
    from plugins.checks import check_ping
    status = check_ping.run(self.ax_handle, [device], True)
    q.put(status[device.hostname])

...

if operation == "ping":
    queue = Queue()
    p = Process(target=check_ping, args=(queue, device))
    p.start()
    p.join()
    status, msg = queue.get()
elif operation == "staf":
    ...

Unfortunately this has slowed down everything.
Before adding multiprocessing, all the checks would start and run at the same time (~20 checks) and they would get updated through AJAX in the UI instantly as they finished.
With multiprocessing though it seems only several of them start at a time, and the UI gets updated after all from a batch finish.
I think it's because of how multiprocessing works, with the serializing/deserializing, pickling/unpickling but I don't really understand what's going on.
User avatar
gplayersv
 
Posts: 16
Joined: Mon Apr 15, 2013 10:56 am

Re: Run a function in a new process

Postby 7stud » Tue May 20, 2014 10:23 pm

Several instances of this function will run at the same time (web.py + AJAX) and they can interfere with one another as they add different things to sys.path and set the same environment variables with different values.

Are you saying that several instances of the function will be run by the same request? If so, I can post a multiprocess example. If not, I doubt any modern server would allow one request to change any environment variables, etc. that would affect another request. You can certainly test that out with your server.

Also, read this:

http://webpy.org/cookbook/ctx

As for what's wrong with your code:


1) You are creating a new Queue for every process you start. Why do you need a Queue for one process? Queues are process safe which means you use them when you have many processes that need to access the Queue. If you have one process, then a list is equivalent to a Queue, and if the list will only contain one item, then a variable is equivalent to a list.

2) When you create a process, you are waiting for it to finish, i.e. join(), before starting another process. If you want lots of processes executing at the same time, then you have to start lots of processes at the same time.
7stud
 
Posts: 106
Joined: Wed Apr 02, 2014 2:36 am

Re: Run a function in a new process

Postby gplayersv » Wed May 21, 2014 8:59 am

7stud wrote:Are you saying that several instances of the function will be run by the same request? If so, I can post a multiprocess example. If not, I doubt any modern server would allow one request to change any environment variables, etc. that would affect another request. You can certainly test that out with your server.

Thank you for the comments 7stud.
I think I'm in over my head. Here's an overview of what is going on.

There's a web.py server, I created a new template, this template has a table, several fields from the table must be updated in real time, ping status, stuff like that. In order to do that I use an ajax call in my template, it looks like this:
Code: Select all
<script type="text/javascript">

lab = getURLParameter('labs');

jQuery(".op_loading").each(function() {

    div = jQuery(this).find(">:first-child").attr('id')
    console.log(div);

    jQuery.ajax({
        type : "POST",
        async : true,
        url : "/labstatus?div_id=" + div + "&lab=" + lab,
        contentType : "application/json; charset=utf-8",
        dataType : "json",
        success : function(response) {
            console.log(response);

            div_id = response[0];
            op_status = response[1];
            message = response[2];

            if (op_status == true){
               jQuery("#" + div_id).parent().removeClass().addClass("op_success");
            } else {
               jQuery("#" + div_id).parent().removeClass().addClass("op_fail");
            }
            jQuery("#" + div_id).parent().prop('title', message.replace(/\n/g, '<br />'));
            jQuery("#" + div_id).parent().tooltip({
                content: function() {
                    return jQuery(this).attr('title');
                }
            });
        },
    });
});
</script>

I made a new web module and mapped it with this url (/labstatus) in urls under the web.py entry file.

Inside that web module, in it's __init__ I print the os.getpid() and it's the same. For each table data I have to update, a new instance of this web module is created, same process id. Inside it I set some environment variables, import something based on those env variables, then run some checks like check ping, and return the status, message. All the checks start at the same time, as a check finishes, it updates it's table data. Because that page can be opened by multiple users, with some different data, the sys.path will have to be set differently (it's based on the lab I transmit in the URL). A stripped down version of this web module:
Code: Select all
# imports...

class LabStatusWebModule:

    def __init__(self):
        self.lab = web.input().lab
        self._add_tp_to_path(lab)
        print " =========> New instance -> %s with sys.path: %s" % (self, sys.path) # TODO:

    def _add_tp_to_path(self, lab):
        # does stuff with sys.path

    def POST(self):
        try:
            div_id = web.input().div_id
            hostname = div_id.split("___")[1]

            if operation == "ping":
                status, msg = self._check_ping(hostname)
            elif operation == "xxx":
                status, msg = self._check_xxx(hostname)
            elif operation == "yyy":
                status, msg = self._check_yyy(hostname)
            elif operation == "zzz":
                status, msg = self._check_zzz(hostname)
            else:
                raise Exception("Check '%s' not implemented" % operation)

            if status == State.PASSED:
                status = True
            else:
                status = False

            return json.dumps([div_id, status, msg])

        except Exception, ex:
            traceback.print_exc(file=sys.stdout)
            return json.dumps([div_id, False, ex.__str__()])

    def _check_ping(self, device):
        from plugins.checks import check_ping # this fails without the "self._add_tp_to_path()" from init
        status = check_ping.run(device)
        return status[device]

    def _check_xxx(self, device):
        from plugins.checks import _check_xxx # this fails without the "self._add_tp_to_path()" from init
        status = _check_xxx.run(device)
        return status[device]

    def _check_yyy(self, device):
        from plugins.checks import _check_yyy # this fails without the "self._add_tp_to_path()" from init
        status = _check_yyy.run(device)
        return status[device]

    def _check_zzz(self, device):
        from somewhere import something # this fails without the "self._add_tp_to_path()" from init
        something.setup() # adds more stuff to sys.path, adds os.environ keys

        from plugins.checks import _check_zzz # this fails without the "self._add_tp_to_path()" from init
        status = _check_zzz.run(device)
        return status[device]


In order for these instances to have their own sys.path, I tried to spawn each check in a new process with multiprocessing. In this way the sys.path doesn't get mixed up, everything works as expected but it doesn't start all the checks at the same time anymore, even though the only thing I change is starting the check in a new process. The AJAX call is the same, everything else is the same. Now a bunch of checks start at the same time (around 6), and after all from this bunch finish a next bunch is started and so forth.
Stripped down it looks like this:
Code: Select all
# imports...

class LabStatusWebModule:
    queue = Queue()
    def __init__(self):
        self.lab = web.input().lab
        print " =========> New instance -> %s with sys.path: %s" % (self, sys.path) # TODO:

    def _add_tp_to_path(self, lab):
        # does stuff with sys.path

    def POST(self):
        try:
            div_id = web.input().div_id
            hostname = div_id.split("___")[1]

            if operation == "ping":
                p = Process(target=self._check_ping, args=(self.__class__.queue, hostname))
                p.start()
                p.join()
                status, msg = self.__class__.queue.get()
            elif operation == "xxx":
                p = Process(target=self._check_xxx, args=(self.__class__.queue, hostname))
                p.start()
                p.join()
                status, msg = self.__class__.queue.get()
            elif operation == "yyy":
                p = Process(target=self._check_yyy, args=(self.__class__.queue, hostname))
                p.start()
                p.join()
                status, msg = self.__class__.queue.get()
            elif operation == "zzz":
                p = Process(target=self._check_zzz, args=(self.__class__.queue, hostname))
                p.start()
                p.join()
                status, msg = self.__class__.queue.get()
            else:
                raise Exception("Check '%s' not implemented" % operation)

            if status == State.PASSED:
                status = True
            else:
                status = False

            return json.dumps([div_id, status, msg])

        except Exception, ex:
            traceback.print_exc(file=sys.stdout)
            return json.dumps([div_id, False, ex.__str__()])

    def _check_ping(self, q, device):
        self._add_tp_to_path()
        from plugins.checks import check_ping # this fails without the above call
        status = check_ping.run(device)
        q.put(status[device])

    def _check_xxx(self, q, device):
        self._add_tp_to_path()
        from plugins.checks import _check_xxx # this fails without the above call
        status = _check_xxx.run(device)
        q.put(status[device])

    def _check_yyy(self, q, device):
        self._add_tp_to_path()
        from plugins.checks import _check_yyy # this fails without the above call
        status = _check_yyy.run(device)
        q.put(status[device])

    def _check_zzz(self, q, device):
        self._add_tp_to_path()
        from somewhere import something # this fails without the the above call
        something.setup() # adds more stuff to sys.path

        from plugins.checks import _check_zzz # this fails without call from the start of this function
        status = _check_zzz.run(device)
        q.put(status[device])


What is put on the queue inside each check is a tuple like: ("passed", "All good").
I tried using pipes instead of queues, for getting the result but it's the same, not all the checks start at the same time.
I'm thinking the next thing I could try is try to create a separate python script that does a check and call this script from the POST method with os.system or something.
User avatar
gplayersv
 
Posts: 16
Joined: Mon Apr 15, 2013 10:56 am

Re: Run a function in a new process

Postby gplayersv » Fri May 23, 2014 7:16 am

It works perfect after I moved all the code from POST to another script and just call this script from POST with subprocess.check_output().
I'm still curious as to why it didn't work with multiprocess, why only several processes could be started at the same time.
User avatar
gplayersv
 
Posts: 16
Joined: Mon Apr 15, 2013 10:56 am

Re: Run a function in a new process

Postby 7stud » Sat May 24, 2014 4:48 am

I think I'm in over my head.

Apparently not.

I don't know if you might run into some future problem because of this:

In web2py, every HTTP request is served in its own thread. Threads are recycled for efficiency and managed by the web server. For security, the web server sets a time-out on each request. This means that actions should not run tasks that take too long, should not create new threads, and should not fork processes (it is possible but not recommended).


http://web2py.com/books/default/chapter ... background

At the link, there are some alternatives.
7stud
 
Posts: 106
Joined: Wed Apr 02, 2014 2:36 am


Return to General Coding Help

Who is online

Users browsing this forum: mdovey and 5 guests