Nonblocking sockets

A place where you can post Python-related tutorials you made yourself, or links to tutorials made by others.

Nonblocking sockets

Postby mouseroot » Sat Feb 16, 2013 2:15 am

Code: Select all
def main():
    import socket,select

    #Standard TCP socket
    server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

    # by using an empty string we are saying any you could also use
    # localhost or socket.gethostname() if you want to expose the server
    # past your network granted you have the correct ports fowarded
    server.bind(("",2500))

    # tell the socket to listen for incoming connections
    server.listen(10)

    # tell the socket not to block while waiting for connections
    server.setblocking(False)
    input = [server]
    while 1:
        input_ready,output_ready,errors = select.select(input, [], [])
       
        for sock in input_ready:
            if sock is server:
                client,address = sock.accept()
                print "Accepting socket from",address[0]
                input.append(client)
            else:
                data = sock.recv(1024)
                if data:
                    print data
                else:
                    sock.close()
                    input.remove(sock)
                    print "Dropped connection",address[0]

if __name__ == "__main__":
    print "Nonblocking sockets example!"
    print "Listening on port 2500"
    main()



First we import socket and select

next we create a socket object and bind it to port 2500 and tell it to listen for up to
10 clients and then tell the server socket to not block (Asynchronous)
and we store the server object into a list(or array)

then we enter the main application loop
and we pass 3 uninitialized variables that get populated by calling the select.select
and passing in the inputs,outputs and error lists

the select.select function asynchronously puts incoming sockets,ready to read sockets
and ready to write sockets in the corresponding uninitialized lists
next we use a "for ... in" loop to loop through the ready_inputs(which is sockets that are
waiting to be accepted)
we then check if the socket in the loop is the same as the server(then we run accept method)
and store that socket into the inputs array
at this point we return to the select.select function and check for ready,inputs and outputs

now if a client that is already connected sends data we skip to the sock.recv() and pull 1024
bytes and print it

Hope this kind of sums it up.
-mouseroot
Last edited by mouseroot on Sun Feb 17, 2013 7:33 am, edited 2 times in total.
mouseroot
 
Posts: 17
Joined: Sat Feb 16, 2013 1:14 am

Re: Nonblocking sockets

Postby micseydel » Sat Feb 16, 2013 3:57 am

Did you have a question?
Join the #python-forum IRC channel on irc.freenode.net!
User avatar
micseydel
 
Posts: 1116
Joined: Tue Feb 12, 2013 2:18 am
Location: Mountain View, CA

Re: Nonblocking sockets

Postby mouseroot » Sun Feb 17, 2013 1:01 am

No just wanted to post an example on how nonblocking sockets work
mouseroot
 
Posts: 17
Joined: Sat Feb 16, 2013 1:14 am

Re: Nonblocking sockets

Postby stranac » Sun Feb 17, 2013 1:17 am

mouseroot wrote:No just wanted to post an example on how nonblocking sockets work

In that case, you should have posted in the Scripts section.
I'm gonna move the topic there, and you should consider adding a short description alongside the code.
Friendship is magic!

R.I.P. Tracy M. You will be missed.
User avatar
stranac
 
Posts: 1093
Joined: Thu Feb 07, 2013 3:42 pm

Re: Nonblocking sockets

Postby bunburya » Sun Feb 17, 2013 9:22 pm

The tutorials section might be even better, since the purpose of the post seems to be education rather than providing a useful script.

BTW, thanks for posting this mouseroot.
http://www.bunburya.eu | https://github.com/bunburya

Join the #python-forum IRC channel on irc.freenode.net!
bunburya
 
Posts: 9
Joined: Sat Feb 09, 2013 7:28 pm
Location: Dublin, Ireland

Re: Nonblocking sockets

Postby stranac » Sun Feb 17, 2013 9:34 pm

Yeah, I think you're right, bunburya.

I didn't really take a close look at the script, and there was no other text, so I just assumed he was sharing a script.
But like this, it's obvious it's actually meant for helping others.

So moving it to Tutorials.
Friendship is magic!

R.I.P. Tracy M. You will be missed.
User avatar
stranac
 
Posts: 1093
Joined: Thu Feb 07, 2013 3:42 pm

Re: Nonblocking sockets

Postby mouseroot » Mon Feb 18, 2013 1:24 am

sometimes non-blocking sockets can be a pain to comprehend especially if you
havent played with them in a long time

I hope I broke it down enough to understand
one thing you have to keep in mind is the select
module will only select sockets on windows
however on *nix and macs you can also use this
with file descriptors and named pipes(files)
mouseroot
 
Posts: 17
Joined: Sat Feb 16, 2013 1:14 am

Re: Nonblocking sockets

Postby Grinch » Wed Sep 04, 2013 2:10 pm

Nice example and explanation!
Now I need a little step about writing back to the client.
To be more clear I need to setup a simple protocol to exchange command and reply between the server and the client.
To accomplish this task, when I've discovered a valid command from the client, I need to send an acknowledge string as responce.
In my example the protocol is very simple and ASCII based, the start command is $, the terminator is %.
The command $QUIT% stop the server and the application can exit.
To support multithreading programming I've included the originally code in a thread, here below the full code:

Code: Select all
# ----------------------------------------------------------------
# TCP Server with non blocking code (thread)
# ----------------------------------------------------------------
# Author : powermos
# Date   : 02-09-2013
# Version: 1.0
# telnet 192.168.2.126 2626
# ----------------------------------------------------------------

import os
# ----------------------------------------------------------------
# TCP Server with non blocking code (thread)
# ----------------------------------------------------------------
# Author : powermos
# Date   : 02-09-2013
# Version: 1.0
# telnet 192.168.2.126 2626
# ----------------------------------------------------------------

import sys
import time
import socket
import select
import threading

# ----------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------
TCPServerCommand = ""   # TCP: TCP command buffer (after succesfully decoding the input data stream)


class TCPThreadedServer(threading.Thread):
   def __init__(self, TCPPort):
      threading.Thread.__init__(self)
      self.TCPPort = TCPPort
      self.stopflag = 0
      
   def run(self):
      global TCPServerCommand
      buffer = ""
      receivedData = ""
      
      #Standard TCP socket
      server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
      server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

      # by using an empty string we are saying any you could also use
      # localhost or socket.gethostname() if you want to expose the server
      # past your network granted you have the correct ports fowarded
      server.bind(("",self.TCPPort))

      # tell the socket to listen for incoming connections
      server.listen(2)

      # tell the socket not to block while waiting for connections
      server.setblocking(False)
      input = [server]
      
      while not(self.stopflag):
         input_ready,output_ready,errors = select.select(input, [], [])
         for sock in input_ready:
            if sock is server:
                  client,address = sock.accept()
                  print "Accepting socket from",address[0]
                  input.append(client)
            else:
               receivedData = sock.recv(1)
               if receivedData:
                  # Read 1 byte from the incoming receivedData buffer
                  # Check the receivedData
                  if (receivedData == "$"):
                     buffer = "$"   # Found the command start, reset the buffer
                  elif ( (receivedData <> "!") and (receivedData <> "%") ):
                     buffer += receivedData
                  elif ( (receivedData == "!") or (receivedData == "%") ):
                     buffer += receivedData
                     # Copy the valid command into the global buffer
                     TCPServerCommand = buffer
                     # Send back ACK to client
                     sock.send("OK"+chr(0x0D)+chr(0x0A))
                     print("Command --> " + buffer)
                     # Check if is a termination request
                     if (buffer == "$QUIT%"):
                        sock.close()
                        input.remove(sock)
                        self.stopflag = 1
               else:
                  sock.close()
                  input.remove(sock)
                  print "Dropped connection",address[0]
                  
      print ("Exit from run")
      return
   
   def stopserver(self):
      print ("(quitthread) Server shutdown!")
      self.stopflag = 1
      return

if __name__ == "__main__":

   print "Nonblocking sockets example!"
   print "Listening on port 2626"
   
   try:
      # Setup a TCP server
      TCPserver = TCPThreadedServer(2626)
      TCPserver.setDaemon(True)
      TCPserver.start()
         
      # initialize the command buffer input
      TCPServerCommand = ""
      
      # Infinite loop (keep alive the task)
      while 1:
         if (TCPServerCommand != ""):
            print("New command! " + TCPServerCommand)
            if (TCPServerCommand == "$QUIT%"): break
            TCPServerCommand = ""
         pass
         
   except (KeyboardInterrupt, SystemExit):
      print "(main) Exception detected"
      TCPserver.stopserver()
   finally:
      time.sleep(1)
      print "(main) Exiting from main program"
      sys.exit()


This code is running into a Raspberry PI embedded computer with Python 2.7.3 installed.
To check the code I'm using a telnet session in a WinXP OS.
All the code seems working but I need to ask expert for some infos (I've started to learn Python from some weeks).

1. First I need to accept only 1 connection and block any other attempt by sending back to the client a message like "NOT ALLOWED!"
2. I don't know if my way is totally right and clean to send back to the client a acknowledge on a succesfully command received, I'm using the sock.send command but I've a suspect that I've first to check if the socket is ready for write before attempt to write on it, could someone give me some idea and suggest?

Hope someone can give me some support on the subject.

Thank in advance!

Powermos
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Thu Sep 05, 2013 6:58 am

Dear all,
as update to my previous post I'm trying to code a simple login access manager.
This code will be runned on a Raspberry PI and I need to protect from unauthorized access.
That's why I decided to create an authentication mechanism based on the protocol described above.

The authentication command could be the following:
$LOGIN:USER-PASS%

the system should therefore:
- wait for incoming connection and accept the connection
- check if the USER and PASS are valid and which privilege level assign (ADMIN or GUEST)
- store them in a list containing (IP address), (USER), (ADMIN or GUEST)
- once the user has been verified all subsequent commands will be placed in a queue and processed by a second thread

With this simple system we can grant right access to my board and also by using the protocol we can did several action like list the dir file or trasfer a file or if the privilege is ADMIN, write and delete file.

Thank in advance anyone who wants to dedicate some spare time to this project.

Obviously all my code will be left free for anyone.

Bye for now.

Powermos
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Fri Sep 06, 2013 11:51 am

Dear all,
I've did a little update to my code and do a clean up of the code itself.
At now I've written a simple parser that from a thread read the command queue that is filled by the TCP server and send back to the client a ACK or NACK message.
Here in attachement the full listing that I'm testing on a Raspberry PI board by connection trough telnet from a PC with Windows XP.

As usual suggestions are welcomed, thread programming is powerful but is also so insidious for a beginner like me.

Thanks to all!
Powermos

Code: Select all
# ----------------------------------------------------------------
# TCP Server and command parser with non blocking code (thread)
# ----------------------------------------------------------------
# Author : Powermos
# Date   : 02-09-2013
# Version: 1.0
# Date   : 06/09/2013
# Python : 2.7.3
# Bugs   : -
# ----------------------------------------------------------------
# Notes:
# To make code testing run the script with:
# python nonblocking1.py
# and connect to the server with e.g. telnet:
#   telnet 192.168.2.126 2626
# 192.168.2.126 is my Raspberry PI machine IP address
# or if you prefer test it on your machine open another
# command window and use simply this command:
#   telnet 127.0.0.1 2626
# CTRL+C allow quitting the program without crashing or stalling the
# machine (if you got different behaviour kindly let me know),
# using the command $QUIT% you should be able to close the application gently.
# ----------------------------------------------------------------

import os
import sys
import time
import socket
import select
import threading
import Queue

# ----------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------
userList = [("USER1","1234","ADMIN"),("USER2","1234","GUEST"),("USER3","1234","GUEST")]                        # TCP: list of the enabled user
supportedCommand =("LOGIN","LOGOUT","QUIT","FREAD","FWRITE","FDELETE","DIRLIST","DIRDELETE","DIRNEW","DIRCHANGE")   # TCP: supported command list

COMMAND_FIELD_NUMBER = 3   # TCP: Each command take three field with : as separator
CMD_MAIN = 0            # TCP: index for the command type
CMD_FIELD_1 = 1            # TCP: index for the data field 1
CMD_FIELD_2 = 2            # TCP: index for the data field 2

userConn = Queue.Queue()   # TCP: queue for the current connected user (authorizated)
userCommand =Queue.Queue()   # TCP: queue for the command to process

TCPServerCommand = ""      # TCP: TCP command buffer (only for debug purpose)

# ----------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------
class TCPThreadedServer(threading.Thread):
   def __init__(self, TCPPort, userConn, userCommand, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.TCPPort = TCPPort
      self.stopflag = 0
      self.userConn = userConn
      self.userCommand = userCommand
      
   def run(self):
      global TCPServerCommand
      buffer = ""
      receivedData = ""
      
      #Standard TCP socket
      server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
      server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

      # by using an empty string we are saying any you could also use
      # localhost or socket.gethostname() if you want to expose the server
      # past your network granted you have the correct ports fowarded
      server.bind(("",self.TCPPort))

      # tell the socket to listen for incoming connections
      server.listen(1)

      # tell the socket not to block while waiting for connections
      server.setblocking(False)
      input = [server]
      
      while not(self.stopflag):
         input_ready,output_ready,errors = select.select(input, [], [])
         for sock in input_ready:
            if sock is server:
               client,address = sock.accept()
               print("(DEBUG - TCPThreadedServer) Accepting socket from",address[0],address[1])
               input.append(client)
            else:
               receivedData = sock.recv(1)
               if receivedData:
                  # Read 1 byte from the incoming receivedData buffer
                  # Check the receivedData
                  if (receivedData == "$"):
                     buffer = "$"   # Found the command start, reset the buffer
                  elif ( (receivedData != "!") and (receivedData != "%") ):
                     buffer += receivedData
                  elif ( (receivedData == "!") or (receivedData == "%") ):
                     buffer += receivedData
                     # Copy the valid command into the global buffer
                     TCPServerCommand = buffer
                     # Send back ACK to client will be performed by the ProcessCommand class
                     #sock.send("OK"+chr(0x0D)+chr(0x0A)+" "+str(address[0])+" "+str(address[1]))
                     #self.sendACK(sock)
                     print("(DEBUG - TCPThreadedServer) Command --> " + buffer )
                     # Check if is a termination request
                     if (buffer == "$QUIT%"):
                        sock.close()
                        input.remove(sock)
                        self.stopflag = 1
                     # ---------------------------------------------------------
                     # Check if the command have a proper format
                     # $<CMD>:<DATA1>:<DATA2>%
                     # ---------------------------------------------------------
                     stringparsed = buffer.split(":")
                     numitem = len(stringparsed)
                     print("(DEBUG - TCPThreadedServer) Found: " + str(numitem) + " elements")
                     if (numitem == COMMAND_FIELD_NUMBER):
                        # Parse the command to find and split over each field
                        cmd_main = stringparsed[CMD_MAIN].strip()
                        cmd_main = cmd_main[1:]   # skip the starting char $
                        cmd_data1 = stringparsed[CMD_FIELD_1].strip()
                        cmd_data2 = stringparsed[CMD_FIELD_2].strip()
                        cmd_data2 = cmd_data2[:-1]   # skip the terminator %
                        if ( (cmd_main == "") or (cmd_data1 == "") or (cmd_data2 == "") ):
                           print "(DEBUG - TCPThreadedServer) Unknow command!"
                        else:
                           # Command frame structure is correct
                           print("(DEBUG - TCPThreadedServer) Valid command received")
                           print("(DEBUG - TCPThreadedServer) Command : " + cmd_main)
                           print("(DEBUG - TCPThreadedServer) Field 1 : " + cmd_data1)
                           print("(DEBUG - TCPThreadedServer) Field 2 : " + cmd_data2)
                           # Insert the command into the command queue
                           self.userCommand.put((sock,address[0], address[1],cmd_main,cmd_data1,cmd_data2))
                           self.userCommand.task_done()
                           pass
                     else:
                        print("(DEBUG - TCPThreadedServer) Unknow command!")
               else:
                  # I've read a empty string then the socket has closed
                  sock.close()
                  input.remove(sock)
                  print("(DEBUG - TCPThreadedServer) Dropped connection",address[0],address[1])
                  
      print ("(DEBUG - TCPThreadedServer) Exit from run")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK%")
      return
      
   def stopserver(self):
      print ("(DEBUG - TCPThreadedServer - stopserver) Server shutdown!")
      self.stopflag = 1
      return

class ProcessCommand(threading.Thread):
   def __init__(self, userConn, userCommand, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.userConn = userConn
      self.userCommand = userCommand
      self.stopflag = 0
            
   def run(self):
      objptr = None
      sock = None
      # Extract the first item from the command queue
      while not(self.stopflag):
         while not self.userCommand.empty():
            # Get the socket from the queue
            objptr = self.userCommand.get()
            # Extract from the object the single elements
            sock = objptr[0]
            clientip =objptr[1]
            clientport=objptr[2]
            cmd_main = str(objptr[3])
            cmd_data1 =str(objptr[4])
            cmd_data2=str(objptr[5])
            print ("(DEBUG - ProcessCommand) SOCKET INFO :", sock)
            print ("(DEBUG - ProcessCommand) Client IP   : " + str(clientip))
            print ("(DEBUG - ProcessCommand) Client PORT : " + str(clientport))
            print ("(DEBUG - ProcessCommand) COMMAND     : " + cmd_main)
            print ("(DEBUG - ProcessCommand) DATA FIELD 1: " + cmd_data1)
            print ("(DEBUG - ProcessCommand) DATA FIELD 2: " + cmd_data2)
            # Send back to the client the ACK
            self.sendACK(sock)
            # ------------------------------
            # NOW WE CAN PROCESS THE COMMAND
            # ------------------------------
            # -- TO DO --
            #
            #self.userCommand.task_done()
      print ("(DEBUG - ProcessCommand) Exit from run")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK-FROM-PARSER%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK-FROM-PARSER%")
      return
   
   def stopparser(self):
      print ("(DEBUG - ProcessCommand - stopparser) Parser shutdown!")
      self.stopflag = 1
      return
      
if __name__ == "__main__":

   print("Nonblocking sockets example!")
   print("Listening on port 2626")
   
   try:
      # Setup a TCP server
      TCPserver = TCPThreadedServer(2626, userConn, userCommand)
      TCPserver.setDaemon(True)
      TCPserver.start()
         
      # initialize the command buffer input
      # used only for debug purpose to manage command
      # we have to use the proper queue and command parser
      TCPServerCommand = ""
      
      # Setup the command parser thread
      TCPCmdParser = ProcessCommand(userConn, userCommand)
      TCPCmdParser.setDaemon(True)
      TCPCmdParser.start()
      
      # Infinite loop (keep alive the task)
      while 1:
         if (TCPServerCommand != ""):
            print("(DEBUG - main) New command! " + TCPServerCommand)
            if (TCPServerCommand == "$QUIT%"): break
            TCPServerCommand = ""
         pass
         
   except (KeyboardInterrupt, SystemExit):
      print("(main) Exception detected")
      TCPserver.stopserver()
      TCPCmdParser.stopparser()
   finally:
      time.sleep(1)
      print("(main) Exiting from main program")
      sys.exit()
Attachments
nonblocking1.zip
TCP server with embryonic parser
(2.95 KiB) Downloaded 75 times
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Mon Sep 09, 2013 3:16 pm

Dear All,
following a update of the server authentication to the release 1.2
Now the server is able to:

- login a new user
- handle multiple request of same login from different socket (denied, socket will be closed to prevent hacker exploit)
- handle login of same user on the same socket (server reply that you're already connected)
- handle the logout command (should be of this form $LOGOUT:-:-%)
- handle a logout from quitting the client socket without using the above mentioned logout command
- handle a unknow command on a authenticated socket
- handle a unknow command on a not authenticated socket (socket will be closed to prevent hacker exploit)
- handle a bad login (socket will be closed to prevent hacker exploit)

Note
the socket closing by the server is notified to the client on the first char sended by the client after the back closing message returned from the server because the
for loop starting to line 88 on the source is executed only when there is some action into the input socket!

As usual let me know if you found bugs, your think and any useful suggestions to improve my code and the security.

Code: Select all
# ----------------------------------------------------------------
# TCP Server and command parser with non blocking code (thread)
# ----------------------------------------------------------------
# Author : Powermos
# Date   : 02-09-2013
# Version: 1.2
# Date   : 09/09/2013
# Python : 2.7.3
# Bugs   : -
# Link   : http://python-forum.org/viewtopic.php?f=25&t=216#p8386
# ----------------------------------------------------------------
# Notes:
# To make code testing run the script with:
# python nonblocking1.py
# and connect to the server with e.g. telnet:
#   telnet 192.168.2.126 2626
# 192.168.2.126 is my Raspberry PI machine IP address
# or if you prefer test it on your machine open another
# command window and use simply this command:
#   telnet 127.0.0.1 2626
# CTRL+C allow quitting the program without crashing or stalling the
# machine (if you got different behaviour kindly let me know),
# using the command $QUIT% you should be able to close the application gently.
# ----------------------------------------------------------------

import os
import sys
import time
import socket
import select
import threading
import Queue

# ----------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------
userList = [("USER1","1234","ADMIN"),("USER2","1234","GUEST"),("USER3","1234","GUEST")]                        # TCP: list of the enabled user
supportedCommand =("LOGIN","LOGOUT","QUIT","FREAD","FWRITE","FDELETE","DIRLIST","DIRDELETE","DIRNEW","DIRCHANGE")   # TCP: supported command list
strCrLf=chr(0xD)+chr(0x0A)

COMMAND_FIELD_NUMBER = 3   # TCP: Each command take three field with : as separator
CMD_MAIN = 0            # TCP: index for the command type
CMD_FIELD_1 = 1            # TCP: index for the data field 1
CMD_FIELD_2 = 2            # TCP: index for the data field 2

userConn = []                  # TCP: queue for the current connected user (authorizated)
userCommand = Queue.Queue()         # TCP: queue for the command to process
processToserver = Queue.Queue()      # TCP: queue of the sock that have to be closed

TCPServerCommand = ""      # TCP: TCP command buffer (only for debug purpose)

# ----------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------
class TCPThreadedServer(threading.Thread):
   def __init__(self, TCPPort, userCommand, processToserver, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.TCPPort = TCPPort
      self.stopflag = 0
      self.userCommand = userCommand         # Exchange data queue from server to process manager
      self.processToserver = processToserver   # Exchange data queue from process data manager to the server
      
   def run(self):
      global TCPServerCommand
      buffer = ""
      receivedData = ""
      objptr = None
      bFlagSkip = False
      
      #Standard TCP socket
      server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
      server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

      # by using an empty string we are saying any you could also use
      # localhost or socket.gethostname() if you want to expose the server
      # past your network granted you have the correct ports fowarded
      server.bind(("",self.TCPPort))

      # tell the socket to listen for incoming connections
      server.listen(5)

      # tell the socket not to block while waiting for connections
      server.setblocking(False)
      input = [server]
      
      while not(self.stopflag):
         input_ready,output_ready,errors = select.select(input, [], [])
         for sock in input_ready:
            
            # Check if the process manager have request to the server to close the current socket
            # may be for a double connection or for a LOGOUT command detected.
            # In this way every connection with a already logged user will be closed on the
            # next valid command that will be received after the login command.
            bFlagSkip = False
            while not self.processToserver.empty():
               objptr = self.processToserver.get()
               if ((objptr[0] == sock) and (objptr[1] == "REMOVE")):
                  # I've to close the socket
                  print("(DEBUG - TCPThreadedServer) Closing socket: " + str(sock) )
                  sock.close()
                  input.remove(sock)
                  bFlagSkip = True
               else:
                  # Leave the socket untouched (restore the previous get)
                  self.processToserver.put((objptr[0], objptr[1]))      
            
            if bFlagSkip:
               # Socket was closed then I can step over to the next socket into the input array
               pass
            else:
               if sock is server:
                  client,address = sock.accept()
                  print("(DEBUG - TCPThreadedServer) Accepting socket from",address[0],address[1])
                  input.append(client)
               else:
                  receivedData = sock.recv(1)
                  address = sock.getpeername()
                  if receivedData:
                     # Read 1 byte from the incoming receivedData buffer
                     # Check the receivedData
                     if (receivedData == "$"):
                        buffer = "$"   # Found the command start, reset the buffer
                     elif ( (receivedData != "!") and (receivedData != "%") ):
                        buffer += receivedData
                     elif ( (receivedData == "!") or (receivedData == "%") ):
                        buffer += receivedData
                        # Copy the valid command into the global buffer
                        TCPServerCommand = buffer
                        # Send back ACK to client will be performed by the ProcessCommand class
                        #sock.send("OK"+chr(0x0D)+chr(0x0A)+" "+str(address[0])+" "+str(address[1]))
                        #self.sendACK(sock)
                        print("(DEBUG - TCPThreadedServer) Command --> " + buffer )
                        # Check if is a termination request
                        if (buffer == "$QUIT%"):
                           sock.close()
                           input.remove(sock)
                           self.stopflag = 1
                        # ---------------------------------------------------------
                        # Check if the command have a proper format
                        # $<CMD>:<DATA1>:<DATA2>%
                        # ---------------------------------------------------------
                        stringparsed = buffer.split(":")
                        numitem = len(stringparsed)
                        print("(DEBUG - TCPThreadedServer) Found: " + str(numitem) + " elements")
                        if (numitem == COMMAND_FIELD_NUMBER):
                           # Parse the command to find and split over each field
                           cmd_main = stringparsed[CMD_MAIN].strip()
                           cmd_main = cmd_main[1:]   # skip the starting char $
                           cmd_data1 = stringparsed[CMD_FIELD_1].strip()
                           cmd_data2 = stringparsed[CMD_FIELD_2].strip()
                           cmd_data2 = cmd_data2[:-1]   # skip the terminator %
                           if ( (cmd_main == "") or (cmd_data1 == "") or (cmd_data2 == "") ):
                              print "(DEBUG - TCPThreadedServer) Unknow command!"
                           else:
                              # Command frame structure is correct
                              print("(DEBUG - TCPThreadedServer) Valid command received")
                              print("(DEBUG - TCPThreadedServer) IP      : " + str(address[0]))
                              print("(DEBUG - TCPThreadedServer) PORT    : " + str(address[1]))
                              print("(DEBUG - TCPThreadedServer) Command : " + cmd_main)
                              print("(DEBUG - TCPThreadedServer) Field 1 : " + cmd_data1)
                              print("(DEBUG - TCPThreadedServer) Field 2 : " + cmd_data2)
                              
                              # Socket is open then I can store the new command into the command queue
                              # but if the socket was closed I don't have to store nothing
                              # Insert the command into the command queue
                              self.userCommand.put((sock,address[0], address[1],cmd_main,cmd_data1,cmd_data2))
                              self.userCommand.task_done()
                        else:
                           print("(DEBUG - TCPThreadedServer) Unknow command!")
                  else:
                     # I've to remove the user from the connected user list
                     self.userCommand.put((None,address[0], address[1],"LOGOUT","",""))
                     self.userCommand.task_done()
                     # Close the socket and remove sock from the input list
                     sock.close()
                     input.remove(sock)
                     print("(DEBUG - TCPThreadedServer) Dropped connection",address[0],address[1])
            # self.processToserver.put((sock,"REMOVE"))
         pass         
      print ("(DEBUG - TCPThreadedServer) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK%")
      return
      
   def stopserver(self):
      print ("(DEBUG - TCPThreadedServer - stopserver) Server shutdown!")
      self.stopflag = 1
      return

class ProcessCommand(threading.Thread):
   def __init__(self, userConn, userCommand, userList, processToserver, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.userConn = userConn
      self.userCommand = userCommand
      self.userList = userList
      self.processToserver = processToserver
      self.stopflag = 0
            
   def run(self):
      objptr = None
      sock = None
      listptr = None
      # Extract the first item from the command queue
      while not(self.stopflag):
         while not self.userCommand.empty():
            # Get the pointer to the first element of the queue
            objptr = self.userCommand.get()
            # if sock is None I've to check if there is a related user to remove
            # or if not logged simply do nothing
            if (objptr[0] != None):
               # Extract from the object the single elements
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               print ("(DEBUG - ProcessCommand) SOCKET INFO :", sock)
               print ("(DEBUG - ProcessCommand) Client IP   : " + str(clientip))
               print ("(DEBUG - ProcessCommand) Client PORT : " + str(clientport))
               print ("(DEBUG - ProcessCommand) COMMAND     : " + cmd_main)
               print ("(DEBUG - ProcessCommand) DATA FIELD 1: " + cmd_data1)
               print ("(DEBUG - ProcessCommand) DATA FIELD 2: " + cmd_data2)
               # Send back to the client the ACK
               #self.sendACK(sock)
               # ------------------------------
               # NOW WE CAN PROCESS THE COMMAND
               # ------------------------------
               # 1. Check if is a LOGIN command
               #
               # Reply to the client:
               # LOGIN-EXISTING        : login command yet executed on same socket
               # LOGIN-DUPLICATE-DENIED: attempt to open a new login of a connected user with another socket is denied, socket will be closed
               # LOGIN-DENIED          : user is not authorized or password is not correct, socket will be closed
               # LOGIN-SUCCESFULLY     : user succesfully logged
               # ------------------------------
               if (cmd_main == "LOGIN"):
                  # Chck if the user is already registered (same socket and user name)
                  # 1. same socket (no action taken)
                  # 2. different socket
                  c = 0
                  while c < len(self.userConn):
                     listptr = self.userConn[c]
                     if (listptr[3] == cmd_data1):
                        # User already present into the logged list
                        if (listptr[0] == sock):
                           # User is already connected on same socket
                           self.sendMessage(sock, "$LOGIN-EXISTING%")
                        else:
                           # User is already logged from a different socket, the current connection will be closed
                           # but I've also to remove, from the user list, the user that has tryed to login with
                           # one account which was previously taken from another socket
                           self.sendMessage(sock, "$LOGIN-DUPLICATE-DENIED%")
                           # Tell to server to remove the socket and close the connection with the client
                           self.processToserver.put((sock,"REMOVE"))
                           # Remove the current user, if is logged, from the userlist
                           x = 0
                           while x < len(self.userConn):
                              onerow = self.userConn[x]
                              if (sock == onerow[0]):
                                 del self.userConn[x]
                                 break
                              x = x + 1
                        break
                     c = c + 1
                  
                  # If the user isn't into the active list I must check if it is into the authorized users list
                  if (c == len(self.userConn)):
                     # User not present check if this is present into the authorized account set
                     c = 0
                     while c < len(self.userList):
                        listptr = self.userList[c]
                        if ((listptr[0] == cmd_data1) and (listptr[1] == cmd_data2)):
                           break
                        c = c + 1
                     if (c == len(self.userList)):
                        self.sendMessage(sock, "$LOGIN-DENIED%")
                        # Tell to server to remove the socket and close the connection with the client
                        self.processToserver.put((sock,"REMOVE"))
                     else:
                        # Succesfully login
                        self.userConn.append((sock, clientip, clientport, cmd_data1, listptr[2]))
                        # Notify it to the client
                        self.sendMessage(sock, "$LOGIN-SUCCESFULLY%")
                        
                  # Print the whole connected list for debug
                  c = 0
                  while c < len(self.userConn):
                     print("User #"+str(c)+": " + str(self.userConn[c]))
                     c = c + 1
               else:
                  # Command is different than LOGIN then before any other things I've to check
                  # if the user is logged
                  c = 0
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if (sock == onerow[0]):   break
                     c = c + 1
                     
                  if ((c == len(self.userConn)) and (c != 0)):
                     self.sendMessage(sock,"$LOGIN-NEEDED%")
                     # Tell to server to remove the socket and close the connection with the client
                     self.processToserver.put((sock,"REMOVE"))
                  else:
                     # ----------------------------------------------
                     # Authenticated user, hence I can analyze the command
                     # ----------------------------------------------
                     
                     # ------------------------------
                     # 2. Check if the LOGOUT command
                     # ------------------------------
                     if (cmd_main == "LOGOUT"):
                        c = 0
                        while c < len(self.userConn):
                           onerow = self.userConn[c]
                           if (sock == onerow[0]):
                              # The logout command was for a logged socket
                              del self.userConn[c]
                              # Tell to server to remove the socket and close the connection with the client
                              self.processToserver.put((sock,"REMOVE"))
                              break
                           c = c + 1
                        if ((c == len(self.userConn)) and (c != 0)):
                           # Normally I should not reach this statement!
                           print("(DEBUG - ProcessCommand) ERROR in processing the LOGOUT command!\n")
                     elif (cmd_main == "SOMECOMMAND"):
                        # ------------------------------
                        # 3. Check for the ACTIVEUSER command
                        # ------------------------------
                        # .. TO DO ..
                        #
                        pass
                     else:
                        self.sendNACK(sock)
                     
            else:
               # The sock is equal to None then I've detected a forced logout, the socket was already closed
               # by the server section (user has close the connection without using the provided LOGOUT command).
               # Now I've just to clean the connected user list if the user was still logged or
               # if the user was not logged simply take no action at all.
               # Extract from the object the single elements
               print("(DEBUG - ProcessComman) Socket pointer was closed without logout command")

               # Print the whole connected list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]))
                  c = c + 1

               # Search for the connection
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               if (cmd_main == "LOGOUT"):
                  # Scan the logged user list
                  c = 0
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if ((clientip == onerow[1]) and (clientport == onerow[2])):
                        # User was logged, clean the connected user list item
                        del self.userConn[c]
                        # Is not possibile to send back data because the socket was closed!
                        print("(DEBUG - ProcessCommand) Logout from " + str(clientip) + ":" + str(clientport)+" was succesfully!\n")
                        break
                     c = c + 1
                     
                  if ((c == len(self.userConn)) and (c != 0)):
                     # User was not logged no action to take
                     print("(DEBUG - ProcessCommand) User was not logged, it has simply closed the client side\n")
               
               # Print the whole connected list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]))
                  c = c + 1

      # End of the main while loop            
      print ("(DEBUG - ProcessCommand) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK-FROM-PARSER%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK-FROM-PARSER%")
      return
      
   def sendMessage(self,sock, msg):
      sock.send(msg)
      return
   
   def checkLoginCommand(self, sock):
      # Look if the user exist
      return
   
   
   def stopparser(self):
      print ("(DEBUG - ProcessCommand - stopparser) Parser shutdown!")
      self.stopflag = 1
      return
      
if __name__ == "__main__":

   print("Nonblocking sockets example!")
   print("Listening on port 2626")
   
   try:
      # Setup a TCP server
      TCPserver = TCPThreadedServer(2626, userCommand, processToserver)
      TCPserver.setDaemon(True)
      TCPserver.start()
         
      # initialize the command buffer input
      # used only for debug purpose to manage command
      # we have to use the proper queue and command parser
      TCPServerCommand = ""
      
      # Setup the command parser thread
      TCPCmdParser = ProcessCommand(userConn, userCommand, userList, processToserver)
      TCPCmdParser.setDaemon(True)
      TCPCmdParser.start()
      
      # Infinite loop (keep alive the task)
      while 1:
         if (TCPServerCommand != ""):
            print("(DEBUG - main) New command! " + TCPServerCommand)
            if (TCPServerCommand == "$QUIT%"): break
            TCPServerCommand = ""
         pass
         
   except (KeyboardInterrupt, SystemExit):
      print("(main) Exception detected")
      TCPserver.stopserver()
      TCPCmdParser.stopparser()
   finally:
      time.sleep(1)
      print("(main) Exiting from main program")
      sys.exit()
Attachments
nonblocking12.zip
Release 1.2 - 09/09/2013
(4.95 KiB) Downloaded 75 times
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Tue Sep 10, 2013 2:53 pm

Dear all,
updated source, correct the login into the ProcessCommand section due to some side effects, corrected the socket closing code which is into the TCPThreadedServer section.

As usual let me know if you found bugs, your think and any useful suggestions to improve my code and the security.

Thank you!
Powermos

Code: Select all
# ----------------------------------------------------------------
# TCP Server and command parser with non blocking code (thread)
# ----------------------------------------------------------------
# Author : Powermos
# Date   : 02-09-2013
# Version: 1.3
# Date   : 10/09/2013
# Python : 2.7.3
# Bugs   : -
# Link   : http://python-forum.org/viewtopic.php?f=25&t=216#p8386
# ----------------------------------------------------------------
# Notes:
# To make code testing run the script with:
# python nonblocking1.py
# and connect to the server with e.g. telnet:
#   telnet 192.168.2.126 2626
# 192.168.2.126 is my Raspberry PI machine IP address
# or if you prefer test it on your machine open another
# command window and use simply this command:
#   telnet 127.0.0.1 2626
# CTRL+C allow quitting the program without crashing or stalling the
# machine (if you got different behaviour kindly let me know),
# using the command $QUIT% you should be able to close the application gently.
# ----------------------------------------------------------------

import os
import sys
import time
import socket
import select
import threading
import Queue

# ----------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------
userList = [("USER1","1234","ADMIN"),("USER2","1234","GUEST"),("USER3","1234","GUEST")]                        # TCP: list of the enabled user
supportedCommand =("LOGIN","LOGOUT","QUIT","FREAD","FWRITE","FDELETE","DIRLIST","DIRDELETE","DIRNEW","DIRCHANGE")   # TCP: supported command list
strCrLf=chr(0xD)+chr(0x0A)

COMMAND_FIELD_NUMBER = 3   # TCP: Each command take three field with : as separator
CMD_MAIN = 0            # TCP: index for the command type
CMD_FIELD_1 = 1            # TCP: index for the data field 1
CMD_FIELD_2 = 2            # TCP: index for the data field 2

userConn = []                  # TCP: queue for the current connected user (authorizated)
userCommand = Queue.Queue()         # TCP: queue for the command to process
processToserver = Queue.Queue()      # TCP: queue of the sock that have to be closed

TCPServerCommand = ""      # TCP: TCP command buffer (only for debug purpose)

# ----------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------
class TCPThreadedServer(threading.Thread):
   def __init__(self, TCPPort, userCommand, processToserver, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.TCPPort = TCPPort
      self.stopflag = 0
      self.userCommand = userCommand         # Exchange data queue from server to process manager
      self.processToserver = processToserver   # Exchange data queue from process data manager to the server
      
   def run(self):
      global TCPServerCommand
      buffer = ""
      receivedData = ""
      objptr = None
      bFlagSkip = False
      
      #Standard TCP socket
      server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
      server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

      # by using an empty string we are saying any you could also use
      # localhost or socket.gethostname() if you want to expose the server
      # past your network granted you have the correct ports fowarded
      server.bind(("",self.TCPPort))

      # tell the socket to listen for incoming connections
      server.listen(5)

      # tell the socket not to block while waiting for connections
      server.setblocking(False)
      input = [server]
      
      while not(self.stopflag):
         input_ready,output_ready,errors = select.select(input, [], [])
         for sock in input_ready:
            
            #Check if the process manager have request to the server to close a socket.
            # Because the commands from the process manager are stored in a queue,
            # when we did a item extraction we need to check if this one is equal
            # to the current active socket. If true we can close the socket but if this is not
            # the current processed socket we have to keep it in a temporary queue.
            # Sooner or later we're able to find the right socket and then we can close it.
            # When all the commands process queue will be scanned we will be able to restore
            # all the others sockets preserved into the temporary queue into the processToserver queue
            # for the upcoming socket events.
            tmplist = Queue.Queue()
            bFlagSkip = False
            while not self.processToserver.empty():
               objptr = self.processToserver.get()
               if ((objptr[0] == sock) and (objptr[1] == "REMOVE")):
                  # I've to close the socket
                  print("(DEBUG - TCPThreadedServer) Closing socket: " + str(sock) )
                  sock.close()
                  input.remove(sock)
                  bFlagSkip = True
               else:
                  # Save the socket into the temporary queue
                  tmplist.put(objptr)
            # Restore the items stored into the temporary queue into the original queue
            # if a sock was closed this one will be not present into the temporary list
            # and then also into the processToserver queue. This trick because the queue object
            # don't have a delete method of one item which is inside the queue structure.
            while not tmplist.empty():
               self.processToserver.put(tmplist.get())
            
            if bFlagSkip:
               # Socket was closed then I can step over to the next socket into the input array
               pass
            else:
               if sock is server:
                  client,address = sock.accept()
                  print("(DEBUG - TCPThreadedServer) Accepting socket from",address[0],address[1])
                  input.append(client)
               else:
                  receivedData = sock.recv(1)
                  address = sock.getpeername()
                  if receivedData:
                     # Read 1 byte from the incoming receivedData buffer
                     # Check the receivedData
                     if (receivedData == "$"):
                        buffer = "$"   # Found the command start, reset the buffer
                     elif ( (receivedData != "!") and (receivedData != "%") ):
                        buffer += receivedData
                     elif ( (receivedData == "!") or (receivedData == "%") ):
                        buffer += receivedData
                        # Copy the valid command into the global buffer
                        TCPServerCommand = buffer
                        # Send back ACK to client will be performed by the ProcessCommand class
                        #sock.send("OK"+chr(0x0D)+chr(0x0A)+" "+str(address[0])+" "+str(address[1]))
                        #self.sendACK(sock)
                        print("(DEBUG - TCPThreadedServer) Command --> " + buffer )
                        # Check if is a termination request
                        if (buffer == "$QUIT%"):
                           sock.close()
                           input.remove(sock)
                           self.stopflag = 1
                        # ---------------------------------------------------------
                        # Check if the command have a proper format
                        # $<CMD>:<DATA1>:<DATA2>%
                        # ---------------------------------------------------------
                        stringparsed = buffer.split(":")
                        numitem = len(stringparsed)
                        print("(DEBUG - TCPThreadedServer) Found: " + str(numitem) + " elements")
                        if (numitem == COMMAND_FIELD_NUMBER):
                           # Parse the command to find and split over each field
                           cmd_main = stringparsed[CMD_MAIN].strip()
                           cmd_main = cmd_main[1:]   # skip the starting char $
                           cmd_data1 = stringparsed[CMD_FIELD_1].strip()
                           cmd_data2 = stringparsed[CMD_FIELD_2].strip()
                           cmd_data2 = cmd_data2[:-1]   # skip the terminator %
                           if ( (cmd_main == "") or (cmd_data1 == "") or (cmd_data2 == "") ):
                              print "(DEBUG - TCPThreadedServer) Unknow command!"
                           else:
                              # Command frame structure is correct
                              print("(DEBUG - TCPThreadedServer) Valid command received")
                              print("(DEBUG - TCPThreadedServer) IP      : " + str(address[0]))
                              print("(DEBUG - TCPThreadedServer) PORT    : " + str(address[1]))
                              print("(DEBUG - TCPThreadedServer) Command : " + cmd_main)
                              print("(DEBUG - TCPThreadedServer) Field 1 : " + cmd_data1)
                              print("(DEBUG - TCPThreadedServer) Field 2 : " + cmd_data2)
                              
                              # Socket is open then I can store the new command into the command queue
                              # but if the socket was closed I don't have to store nothing
                              # Insert the command into the command queue
                              self.userCommand.put((sock,address[0], address[1],cmd_main,cmd_data1,cmd_data2))
                              self.userCommand.task_done()
                        else:
                           print("(DEBUG - TCPThreadedServer) Unknow command!")
                  else:
                     # I've to remove the user from the connected user list
                     self.userCommand.put((None,address[0], address[1],"LOGOUT","",""))
                     self.userCommand.task_done()
                     # Close the socket and remove sock from the input list
                     sock.close()
                     input.remove(sock)
                     print("(DEBUG - TCPThreadedServer) Dropped connection",address[0],address[1])
         pass         
      print ("(DEBUG - TCPThreadedServer) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK%")
      return
      
   def stopserver(self):
      print ("(DEBUG - TCPThreadedServer - stopserver) Server shutdown!")
      self.stopflag = 1
      return

class ProcessCommand(threading.Thread):
   def __init__(self, userConn, userCommand, userList, processToserver, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.userConn = userConn
      self.userCommand = userCommand
      self.userList = userList
      self.processToserver = processToserver
      self.stopflag = 0
            
   def run(self):
      objptr = None
      sock = None
      listptr = None
      msg = ""
      # Extract the first item from the command queue
      while not(self.stopflag):
         while not self.userCommand.empty():
            # Get the pointer to the first element of the queue
            objptr = self.userCommand.get()
            # if sock is None I've to check if there is a related user to remove
            # or if not logged simply do nothing
            if (objptr[0] != None):
               # Extract from the object the single elements
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               print ("(DEBUG - ProcessCommand) SOCKET INFO :", sock)
               print ("(DEBUG - ProcessCommand) Client IP   : " + str(clientip))
               print ("(DEBUG - ProcessCommand) Client PORT : " + str(clientport))
               print ("(DEBUG - ProcessCommand) COMMAND     : " + cmd_main)
               print ("(DEBUG - ProcessCommand) DATA FIELD 1: " + cmd_data1)
               print ("(DEBUG - ProcessCommand) DATA FIELD 2: " + cmd_data2)
               # Send back to the client the ACK
               #self.sendACK(sock)
               # ------------------------------
               # NOW WE CAN PROCESS THE COMMAND
               # ------------------------------
               # 1. Check if is a LOGIN command
               # ------------------------------
               if (cmd_main == "LOGIN"):
                  # 1. socket logged, same user name, no action taken (user is already logged)
                  # 2. socket not logged, but user name is used from another socket, not allowed (duplicate)
                  # 3. socket logged, user name not used and different from the previous login, not allowed (change user)
                  # 4. socket logged, user name is used from another socket, not allowed
                  # 5. socket isn't logged and also the user name isn't in use, try to register
                  
                  bFlagSocketFound = False
                  bFlagUserNameFound = False
                  bFlagAccountFound = False
                  c = 0
                  while c < len(self.userConn):
                     listptr = self.userConn[c]
                     if (listptr[0] == sock):
                        bFlagSocketFound = True
                     if (listptr[3] == cmd_data1):
                        bFlagUserNameFound = True
                     if ((listptr[0] == sock) and (listptr[3] == cmd_data1)):
                        bFlagAccountFound = True
                     c=c+1
                     
                  if bFlagAccountFound:
                     # 1. user is already connected on same socket
                     self.sendMessage(sock, "$LOGIN-EXISTING%")
                  else:
                     if bFlagSocketFound:
                        # -----------------
                        # socket logged
                        # -----------------
                        if bFlagUserNameFound:
                           # 4. socket logged, user name is used from another socket, not allowed
                           self.sendMessage(sock, "$NOT-ALLOWED-USER-USED%")
                        else:
                           # 3. socket logged, user name not used and different from previous login, not allowed (change user require first LOGOUT)
                           self.sendMessage(sock, "$NOT-ALLOWED-DIRECT-CHANGE-USER%")
                     else:
                        # -----------------
                        # socket not logged
                        # -----------------
                        if bFlagUserNameFound:
                           # 2. socket not logged, but user name is used from another socket, not allowed (duplicate)
                           self.sendMessage(sock, "$NOT-ALLOWED-USER-IN-USE%")
                           # Tell to server to remove the socket and close the connection with the client
                           self.processToserver.put((sock,"REMOVE"))
                           # Remove the current user, if is logged, from the userlist
                           c = 0
                           while c < len(self.userConn):
                              onerow = self.userConn[c]
                              if (sock == onerow[0]):
                                 del self.userConn[c]
                                 break
                              c = c + 1
                        else:
                           # 5. socket non logged and username is not already used
                           bFlagAuthorized = False
                           c = 0
                           while c < len(self.userList):
                              listptr = self.userList[c]
                              if ((listptr[0] == cmd_data1) and (listptr[1] == cmd_data2)):
                                 bFlagAuthorized = True
                                 break
                              c = c + 1
                              
                           if (bFlagAuthorized):
                              # Succesfully login
                              self.userConn.append((sock, clientip, clientport, cmd_data1, listptr[2]))
                              self.sendMessage(sock, "$LOGIN-SUCCESFULLY%")
                           else:
                              # The specified account is not listed into the authorized user list
                              self.sendMessage(sock, "$LOGIN-DENIED%")
                              # Tell to server to remove the socket and close the connection with the client
                              self.processToserver.put((sock,"REMOVE"))
               else:
                  # Command received was different than LOGIN
                  # then before any other things I've to check
                  # if the user is logged or not and take proper action
                  
                  print("(DEBUG - ProcessCommand) Not a login command")
                                       
                  c = 0
                  bFlagIsLogged = False
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if (sock == onerow[0]):
                        bFlagIsLogged = True
                     c = c + 1
                  
                  if (not bFlagIsLogged):
                     self.sendMessage(sock,"$LOGIN-NEEDED%")
                     # Tell to server to remove the socket and close the connection with the client
                     self.processToserver.put((sock,"REMOVE"))
                  else:
                     # ----------------------------------------------
                     # Authenticated user, hence I can analyze the command
                     # ----------------------------------------------
                     
                     # ------------------------------
                     # 2. Check if the LOGOUT command
                     # ------------------------------
                     if (cmd_main == "LOGOUT"):
                        c = 0
                        while c < len(self.userConn):
                           onerow = self.userConn[c]
                           if (sock == onerow[0]):
                              # The logout command was for a logged socket
                              del self.userConn[c]
                              # Tell to server to remove the socket and close the connection with the client
                              self.processToserver.put((sock,"REMOVE"))
                              break
                           c = c + 1
                        if ((c == len(self.userConn)) or (c == 0)):
                           # Normally I should not reach this statement!
                           print("(DEBUG - ProcessCommand) ERROR in processing the LOGOUT command!\n")
                     elif (cmd_main == "ACTIVEUSER"):
                        # ------------------------------
                        # 3. Check for the ACTIVEUSER command
                        # Enabled only for ADMIN user
                        # ------------------------------
                        if (self.userLevel(sock, self.userConn) == "ADMIN"):
                           # Print the connected user list
                           c = 0
                           while c < len(self.userConn):
                              msg = "User #"+str(c)+": " + str(self.userConn[c]) + chr(0x0D) + chr(0x0A)
                              self.sendMessage(sock,msg)
                              c = c + 1
                        else:
                           msg = "$NOT-ALLOWED%"
                           self.sendMessage(sock,msg)
                     elif (cmd_main == "XXXX"):
                        # ------------------------------
                        # 4. Check for the XXXX command
                        # Enabled only for ADMIN/GUEST user
                        # ------------------------------
                        pass
                     else:
                        self.sendNACK(sock)
                     
            else:
               # The sock is equal to None then I've detected a forced logout, the socket was already closed
               # by the server section (user has close the connection without using the provided LOGOUT command).
               # Now I've just to clean the connected user list if the user was still logged or
               # if the user was not logged simply take no action at all.
               # Extract from the object the single elements
               print("(DEBUG - ProcessComman) Socket pointer was closed without logout command")

               # Print the whole connected list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]))
                  c = c + 1

               # Search for the connection
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               if (cmd_main == "LOGOUT"):
                  # Scan the logged user list
                  c = 0
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if ((clientip == onerow[1]) and (clientport == onerow[2])):
                        # User was logged, clean the connected user list item
                        del self.userConn[c]
                        # Is not possibile to send back data because the socket was closed!
                        print("(DEBUG - ProcessCommand) Logout from " + str(clientip) + ":" + str(clientport)+" was succesfully!\n")
                        break
                     c = c + 1
                     
                  if ((c == len(self.userConn)) or (c == 0)):
                     # User was not logged no action to take
                     print("(DEBUG - ProcessCommand) User was not logged, it has simply closed the client side\n")
               
               # Print the whole connected list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]))
                  c = c + 1

      # End of the main while loop            
      print ("(DEBUG - ProcessCommand) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK-FROM-PARSER%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK-FROM-PARSER%")
      return
      
   def sendMessage(self,sock, msg):
      sock.send(msg)
      return
   
   def userLevel(self, sock, userConn):
      # Scan the user list and return the privilege level
      level = "GUEST"
      userList = userConn
      c = 0
      while c < len(userList):
         onerow = userList[c]
         if (sock == onerow[0]):
            level = onerow[4]
            break
         c = c + 1
      return(level)
   
   
   def stopparser(self):
      print ("(DEBUG - ProcessCommand - stopparser) Parser shutdown!")
      self.stopflag = 1
      return
      
if __name__ == "__main__":

   print("Nonblocking sockets example!")
   print("Listening on port 2626")
   
   try:
      # Setup a TCP server
      TCPserver = TCPThreadedServer(2626, userCommand, processToserver)
      TCPserver.setDaemon(True)
      TCPserver.start()
         
      # initialize the command buffer input
      # used only for debug purpose to manage command
      # we have to use the proper queue and command parser
      TCPServerCommand = ""
      
      # Setup the command parser thread
      TCPCmdParser = ProcessCommand(userConn, userCommand, userList, processToserver)
      TCPCmdParser.setDaemon(True)
      TCPCmdParser.start()
      
      # Infinite loop (keep alive the task)
      while 1:
         if (TCPServerCommand != ""):
            print("(DEBUG - main) New command! " + TCPServerCommand)
            if (TCPServerCommand == "$QUIT%"): break
            TCPServerCommand = ""
         pass
         
   except (KeyboardInterrupt, SystemExit):
      print("(main) Exception detected")
      TCPserver.stopserver()
      TCPCmdParser.stopparser()
   finally:
      time.sleep(1)
      print("(main) Exiting from main program")
      sys.exit()
Attachments
nonblocking13.zip
TCP command server with simple authentication
(5.38 KiB) Downloaded 73 times
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Wed Sep 11, 2013 12:34 pm

Dear all,
updated source to release 1.4.
The big issue present inside of all of the previous releases was into the handling of the buffer used to capture and processing the incoming commands from the client connections.
Into all old releases, the buffer into the TCPThreadedServer class is unique for all the connected sockets. When more than one client try to communicate with the server is a same time the buffer is overwritten and the command messed up. The behaviour is correct only when there is one client at a time talking with the server.
I've found this bug during some tests where more client try to send commands to the server at same time.
To solve this bad behaviour I've added a new list with name userBuffer, each row of this list is a tuple (sock,<string>).
When a new connection request was detected by the server, a new row into the userBuffer list is created, likewise when a socket is closed the related row into the userBuffer list will be removed.
The code used to process the incomings chars is very simple to adapt.
When a incoming char is detected, this one will be stored on the correct buffer which is inside the userBuffer list (the socket is used to perform the list scan), in this way every socket will take the your own buffer to prevent the buffer overlapping when more than one client attempts to communicate with the server.

As usual let me know if you found bugs, your think and any useful suggestions to improve my code and the security.

Regards
Powermos

Code: Select all
# ----------------------------------------------------------------
# TCP Server and command parser with non blocking code (thread)
# ----------------------------------------------------------------
# Author : Powermos
# Date   : 02-09-2013
# Version: 1.4
# Date   : 11/09/2013
# Python : 2.7.3
# Bugs   : -
# Link   : http://python-forum.org/viewtopic.php?f=25&t=216#p8386
# ----------------------------------------------------------------
# Notes:
# To make code testing run the script with:
# python nonblocking1.py
# and connect to the server with e.g. telnet:
#   telnet 192.168.2.126 2626
# 192.168.2.126 is my Raspberry PI machine IP address
# or if you prefer test it on your machine open another
# command window and use simply this command:
#   telnet 127.0.0.1 2626
# CTRL+C allow quitting the program without crashing or stalling the
# machine (if you got different behaviour kindly let me know),
# using the command $QUIT% you should be able to close the application gently.
# ----------------------------------------------------------------

import os
import sys
import time
import socket
import select
import threading
import Queue

# ----------------------------------------------------------------
# GLOBAL VARIABLES
# ----------------------------------------------------------------
userList = [("USER1","1234","ADMIN"),("USER2","1234","ADMIN"),("USER3","1234","GUEST")]                        # TCP: list of the enabled user
supportedCommand =("LOGIN","LOGOUT","QUIT","FREAD","FWRITE","FDELETE","DIRLIST","DIRDELETE","DIRNEW","DIRCHANGE")   # TCP: supported command list
strCrLf=chr(0xD)+chr(0x0A)

COMMAND_FIELD_NUMBER = 3   # TCP: Each command take three field with : as separator
CMD_MAIN = 0            # TCP: index for the command type
CMD_FIELD_1 = 1            # TCP: index for the data field 1
CMD_FIELD_2 = 2            # TCP: index for the data field 2

userConn = []                  # TCP: queue for the current connected user (authorizated)
userBuffer = []                  # TCP: list (sock, buffer) for every active connection
userCommand = Queue.Queue()         # TCP: queue for the command to process
processToserver = Queue.Queue()      # TCP: queue of the sock that have to be closed

TCPServerCommand = ""      # TCP: TCP command buffer (only for debug purpose)

# ----------------------------------------------------------------
# CLASS DEFINITIONS
# ----------------------------------------------------------------
class TCPThreadedServer(threading.Thread):
   def __init__(self, TCPPort, userCommand, processToserver, userBuffer, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.TCPPort = TCPPort
      self.stopflag = 0
      self.userCommand = userCommand         # Exchange data queue from server to process manager
      self.processToserver = processToserver   # Exchange data queue from process data manager to the server
      self.userBuffer = userBuffer         # Buffer related the incoming chars from the opened socket
      
   def run(self):
      global TCPServerCommand
      buffer = ""
      receivedData = ""
      objptr = None
      bFlagSkip = False
      
      #Standard TCP socket
      server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
      server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

      # by using an empty string we are saying any you could also use
      # localhost or socket.gethostname() if you want to expose the server
      # past your network granted you have the correct ports fowarded
      server.bind(("",self.TCPPort))

      # tell the socket to listen for incoming connections
      server.listen(5)

      # tell the socket not to block while waiting for connections
      server.setblocking(False)
      input = [server]
      
      while not(self.stopflag):
         input_ready,output_ready,errors = select.select(input, [], [])
         for sock in input_ready:
            
            # ----------------------------------------------------------------------------------------
            # Check if the process manager have request to the server to close a socket.
            # ----------------------------------------------------------------------------------------
            # Because the commands from the process manager are stored in a queue,
            # when we did a item extraction we need to check if this one is equal
            # to the current active socket. If true we can close the socket but if this is not
            # the current processed socket we have to keep it in a temporary queue.
            # Sooner or later we're able to find the right socket and then we can close it.
            # When all the commands process queue will be scanned we will be able to restore
            # all the others sockets preserved into the temporary queue into the processToserver queue
            # for the upcoming socket events.
            tmplist = Queue.Queue()
            bFlagSkip = False
            while not self.processToserver.empty():
               objptr = self.processToserver.get()
               if ((objptr[0] == sock) and (objptr[1] == "REMOVE")):
                  # I've to close the socket
                  print("(DEBUG - TCPThreadedServer) Closing socket: " + str(sock) )
                  sock.close()
                  input.remove(sock)
                  # Delete the buffer for the socket
                  self.removeBuffer(sock,self.userBuffer)
                  bFlagSkip = True
               else:
                  # Save the socket into the temporary queue
                  tmplist.put(objptr)
            # Restore the items stored into the temporary queue into the original queue
            # if a sock was closed this one will be not present into the temporary list
            # and then also into the processToserver queue. This trick because the queue object
            # don't have a delete method of one item which is inside the queue structure.
            while not tmplist.empty():
               self.processToserver.put(tmplist.get())
            # ----------------------------------------------------------------------------------------
            
            if bFlagSkip:
               # Socket was closed then I can step over to the next socket into the input array
               pass
            else:
               if sock is server:
                  client,address = sock.accept()
                  print("(DEBUG - TCPThreadedServer) Accepting socket from",address[0],address[1])
                  input.append(client)
                  # Create the buffer related to the new socket
                  self.createBuffer(client,self.userBuffer)
                  # DEBUG - print the whole buffer
                  self.listBuffer(self.userBuffer)
               else:
               
                  try:
                     # Read 1 byte from the incoming receivedData buffer
                     receivedData = sock.recv(1)
                     address = sock.getpeername()
                  except Exception, ex:
                     print("----------------------------------")
                     template = "An exception of type {0} occured. Arguments:\n{1!r}"
                     message = template.format(type(ex).__name__, ex.args)
                     print(message)
                     print(type(ex))
                     print(ex)
                     print("----------------------------------")
                     # By setting receivedData to None the TCPThreadedServer close the current socket in a clean way
                     receivedData = None
                     
                  if receivedData:
                     
                     # In order to handle properly several incoming connection is necessary handle also
                     # one buffer for every sock, actually I've only one buffer then when
                     # multiple user send command at same time the buffer will be filled by the
                     # command from the different user!!
                     
                     # Check the receivedData
                     if (receivedData == "$"):
                        # Found the command start, reset the buffer
                        self.deleteBuffer(sock,self.userBuffer)
                        self.writeBuffer(sock, "$", self.userBuffer)
                     elif ( (receivedData != "!") and (receivedData != "%") ):
                        self.writeBuffer(sock, receivedData, userBuffer)
                     elif ( (receivedData == "!") or (receivedData == "%") ):
                        # Detected the command terminator
                        self.writeBuffer(sock, receivedData, self.userBuffer)

                        # Copy the socket buffer in a working buffer (to avoid the buffer list manipulation)
                        buffer = self.readBuffer(sock, self.userBuffer)
                        # DEBUG - Copy the valid command into the global buffer
                        TCPServerCommand = buffer
                        # Send back ACK to client will be performed by the ProcessCommand class
                        #sock.send("OK"+chr(0x0D)+chr(0x0A)+" "+str(address[0])+" "+str(address[1]))
                        #self.sendACK(sock)
                        print("(DEBUG - TCPThreadedServer) Command --> " + buffer )
                        # Check if is a termination request
                        if (buffer == "$QUIT%"):
                           sock.close()
                           input.remove(sock)
                           self.stopflag = 1
                        # ---------------------------------------------------------
                        # Check if the command have a proper format
                        # $<CMD>:<DATA1>:<DATA2>%
                        # ---------------------------------------------------------
                        stringparsed = buffer.split(":")
                        numitem = len(stringparsed)
                        print("(DEBUG - TCPThreadedServer) Found: " + str(numitem) + " elements")
                        if (numitem == COMMAND_FIELD_NUMBER):
                           # Parse the command to find and split over each field
                           cmd_main = stringparsed[CMD_MAIN].strip()
                           cmd_main = cmd_main[1:]   # skip the starting char $
                           cmd_data1 = stringparsed[CMD_FIELD_1].strip()
                           cmd_data2 = stringparsed[CMD_FIELD_2].strip()
                           cmd_data2 = cmd_data2[:-1]   # skip the terminator %
                           if ( (cmd_main == "") or (cmd_data1 == "") or (cmd_data2 == "") ):
                              print "(DEBUG - TCPThreadedServer) Unknow command!"
                           else:
                              # Command frame structure is correct
                              print("(DEBUG - TCPThreadedServer) Valid command received")
                              print("(DEBUG - TCPThreadedServer) IP      : " + str(address[0]))
                              print("(DEBUG - TCPThreadedServer) PORT    : " + str(address[1]))
                              print("(DEBUG - TCPThreadedServer) Command : " + cmd_main)
                              print("(DEBUG - TCPThreadedServer) Field 1 : " + cmd_data1)
                              print("(DEBUG - TCPThreadedServer) Field 2 : " + cmd_data2)
                              
                              # Socket is open then I can store the new command into the command queue
                              # but if the socket was closed I don't have to store nothing
                              # Insert the command into the command queue
                              self.userCommand.put((sock,address[0], address[1],cmd_main,cmd_data1,cmd_data2))
                              self.userCommand.task_done()
                        else:
                           print("(DEBUG - TCPThreadedServer) Unknow command!")
                  else:
                     # I've to remove the user from the connected user list
                     self.userCommand.put((None,address[0], address[1],"LOGOUT","",""))
                     self.userCommand.task_done()
                     # Close the socket and remove sock from the input list
                     sock.close()
                     input.remove(sock)
                     self.removeBuffer(sock,self.userBuffer)
                     print("(DEBUG - TCPThreadedServer) Dropped connection from "+str(address[0])+":"+str(address[1]))
         pass         
      print ("(DEBUG - TCPThreadedServer) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK%")
      return
   
   def createBuffer(self, sock, userBuffer):
      # Create the buffer for a new socket
      bufferList = userBuffer
      bufferList.append((sock,""))
      return
      
   def removeBuffer(self, sock, userBuffer):
      # Remove the buffer of a closed socket
      bufferList = userBuffer
      c = 0
      while c < len(bufferList):
         listptr = bufferList[c]
         if (sock == listptr[0]):
            del bufferList[c]
            break
         c = c + 1
      return
   
   def writeBuffer(self, sock, data, userBuffer):
      bufferList = userBuffer
      # For a socket insert the char into the related buffer
      tmp = ""
      c = 0
      while c < len(bufferList):
         listptr = bufferList[c]
         if (listptr[0] == sock):
            tmp = listptr[1]+data
            del bufferList[c]
            bufferList.append((sock,tmp))
            break
         c=c+1
      return
   
   def readBuffer(self, sock, userBuffer):
      # Read the buffer for the socket
      bufferList = userBuffer
      bufferRead=""
      c = 0
      while c < len(bufferList):
         listptr = bufferList[c]
         if (listptr[0] == sock):
            bufferRead = listptr[1]
            break
         c=c+1
      return(bufferRead)
      
   def deleteBuffer(self, sock, userBuffer):
      # Clear the buffer content of a socket
      bufferList = userBuffer
      tmp = ""
      c = 0
      while c < len(bufferList):
         listptr = bufferList[c]
         if (listptr[0] == sock):
            del bufferList[c]
            bufferList.append((sock,""))
            break
         c=c+1
      return
      
   def listBuffer(self, userBuffer):
      bufferList = userBuffer
      c = 0
      while c < len(bufferList):
         listptr = bufferList[c]
         print("buffer item #"+str(c)+" : " + str(listptr[0]) + ":"+listptr[1])
         c=c+1
      return
      
   def stopserver(self):
      print ("(DEBUG - TCPThreadedServer - stopserver) Server shutdown!")
      self.stopflag = 1
      return

class ProcessCommand(threading.Thread):
   def __init__(self, userConn, userCommand, userList, processToserver, **kwds):
      threading.Thread.__init__(self, **kwds)
      self.userConn = userConn
      self.userCommand = userCommand
      self.userList = userList
      self.processToserver = processToserver
      self.stopflag = 0
            
   def run(self):
      objptr = None
      sock = None
      listptr = None
      msg = ""
      # Extract the first item from the command queue
      while not(self.stopflag):
         while not self.userCommand.empty():
            # Get the pointer to the first element of the queue
            objptr = self.userCommand.get()
            # if sock is None I've to check if there is a related user to remove
            # or if not logged simply do nothing
            if (objptr[0] != None):
               # Extract from the object the single elements
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               print ("(DEBUG - ProcessCommand) SOCKET INFO :", sock)
               print ("(DEBUG - ProcessCommand) Client IP   : " + str(clientip))
               print ("(DEBUG - ProcessCommand) Client PORT : " + str(clientport))
               print ("(DEBUG - ProcessCommand) COMMAND     : " + cmd_main)
               print ("(DEBUG - ProcessCommand) DATA FIELD 1: " + cmd_data1)
               print ("(DEBUG - ProcessCommand) DATA FIELD 2: " + cmd_data2)
               # Send back to the client the ACK
               #self.sendACK(sock)
               # ------------------------------
               # NOW WE CAN PROCESS THE COMMAND
               # ------------------------------
               # 1. Check if is a LOGIN command
               # ------------------------------
               if (cmd_main == "LOGIN"):
                  # 1. socket logged, same user name, no action taken (user is already logged)
                  # 2. socket not logged, but user name is used from another socket, not allowed (duplicate)
                  # 3. socket logged, user name not used and different from the previous login, not allowed (change user)
                  # 4. socket logged, user name is used from another socket, not allowed
                  # 5. socket isn't logged and also the user name isn't in use, try to register
                  
                  bFlagSocketFound = False
                  bFlagUserNameFound = False
                  bFlagAccountFound = False
                  c = 0
                  while c < len(self.userConn):
                     listptr = self.userConn[c]
                     if (listptr[0] == sock):
                        bFlagSocketFound = True
                     if (listptr[3] == cmd_data1):
                        bFlagUserNameFound = True
                     if ((listptr[0] == sock) and (listptr[3] == cmd_data1)):
                        bFlagAccountFound = True
                     c=c+1
                     
                  if bFlagAccountFound:
                     # 1. user is already connected on same socket
                     self.sendMessage(sock, "$LOGIN-EXISTING%")
                  else:
                     if bFlagSocketFound:
                        # -----------------
                        # socket logged
                        # -----------------
                        if bFlagUserNameFound:
                           # 4. socket logged, user name is used from another socket, not allowed
                           self.sendMessage(sock, "$NOT-ALLOWED-USER-USED%")
                        else:
                           # 3. socket logged, user name not used and different from previous login, not allowed (change user require first LOGOUT)
                           self.sendMessage(sock, "$NOT-ALLOWED-DIRECT-CHANGE-USER%")
                     else:
                        # -----------------
                        # socket not logged
                        # -----------------
                        if bFlagUserNameFound:
                           # 2. socket not logged, but user name is used from another socket, not allowed (duplicate)
                           self.sendMessage(sock, "$NOT-ALLOWED-USER-IN-USE%")
                           # Tell to server to remove the socket and close the connection with the client
                           self.processToserver.put((sock,"REMOVE"))
                           # Remove the current user, if is logged, from the userlist
                           c = 0
                           while c < len(self.userConn):
                              onerow = self.userConn[c]
                              if (sock == onerow[0]):
                                 del self.userConn[c]
                                 break
                              c = c + 1
                        else:
                           # 5. socket non logged and username is not already used
                           bFlagAuthorized = False
                           c = 0
                           while c < len(self.userList):
                              listptr = self.userList[c]
                              if ((listptr[0] == cmd_data1) and (listptr[1] == cmd_data2)):
                                 bFlagAuthorized = True
                                 break
                              c = c + 1
                              
                           if (bFlagAuthorized):
                              # Succesfully login
                              self.userConn.append((sock, clientip, clientport, cmd_data1, listptr[2]))
                              self.sendMessage(sock, "$LOGIN-SUCCESFULLY%")
                           else:
                              # The specified account is not listed into the authorized user list
                              self.sendMessage(sock, "$LOGIN-DENIED%")
                              # Tell to server to remove the socket and close the connection with the client
                              self.processToserver.put((sock,"REMOVE"))
               else:
                  # Command received was different than LOGIN
                  # then before any other things I've to check
                  # if the user is logged or not and take proper action
                  
                  print("(DEBUG - ProcessCommand) Not a login command")
                                       
                  c = 0
                  bFlagIsLogged = False
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if (sock == onerow[0]):
                        bFlagIsLogged = True
                     c = c + 1
                  
                  if (not bFlagIsLogged):
                     self.sendMessage(sock,"$LOGIN-NEEDED%")
                     # Tell to server to remove the socket and close the connection with the client
                     self.processToserver.put((sock,"REMOVE"))
                  else:
                     # ----------------------------------------------
                     # Authenticated user, hence I can analyze the command
                     # ----------------------------------------------
                     
                     # ------------------------------
                     # 2. Check if the LOGOUT command
                     # ------------------------------
                     if (cmd_main == "LOGOUT"):
                        bFlagUserRemoved = False
                        c = 0
                        while c < len(self.userConn):
                           onerow = self.userConn[c]
                           if (sock == onerow[0]):
                              # The logout command was for a logged socket
                              del self.userConn[c]
                              # Tell to server to remove the socket and close the connection with the client
                              self.processToserver.put((sock,"REMOVE"))
                              bFlagUserRemoved = True
                              break
                           c = c + 1
                        if (bFlagUserRemoved):
                           # User found and succesfully removed from the logged user list
                           print("(DEBUG - ProcessCommand) INFO: user #" + str(c) + " was removed!")
                        else:
                           # Normally I should not reach this statement!
                           print("(DEBUG - ProcessCommand) ERROR in processing the LOGOUT command!")
                     elif (cmd_main == "ACTIVEUSER"):
                        # ------------------------------
                        # 3. Check for the ACTIVEUSER command
                        # Enabled only for ADMIN user
                        # ------------------------------
                        if (self.userLevel(sock, self.userConn) == "ADMIN"):
                           # Print the connected user list
                           c = 0
                           while c < len(self.userConn):
                              msg = "User #"+str(c)+": " + str(self.userConn[c])
                              self.sendMessage(sock,msg)
                              c = c + 1
                        else:
                           msg = "$NOT-ALLOWED%"
                           self.sendMessage(sock,msg)
                     elif (cmd_main == "XXXX"):
                        # ------------------------------
                        # 4. Check for the XXXX command
                        # Enabled only for ADMIN/GUEST user
                        # ------------------------------
                        pass
                     else:
                        self.sendNACK(sock)
                     
            else:
               # The sock is equal to None then I've detected a forced logout, the socket was already closed
               # by the server section (user has close the connection without using the provided LOGOUT command).
               # Now I've just to clean the connected user list if the user was still logged or
               # if the user was not logged simply take no action at all.
               # Extract from the object the single elements
               print("(DEBUG - ProcessCommand) Socket pointer was closed without using the LOGOUT command")

               # Print the whole logged user list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]) + "\n")
                  c = c + 1

               # Retrieve the current connection (socket) data
               sock = objptr[0]
               clientip =objptr[1]
               clientport=objptr[2]
               cmd_main = str(objptr[3])
               cmd_data1 =str(objptr[4])
               cmd_data2=str(objptr[5])
               if (cmd_main == "LOGOUT"):
                  # Scan the logged user list
                  bFlagUserLogged = False
                  c = 0
                  while c < len(self.userConn):
                     onerow = self.userConn[c]
                     if ((clientip == onerow[1]) and (clientport == onerow[2])):
                        # User was logged, clean the connected user list item
                        del self.userConn[c]
                        bFlagUserLogged = True
                        # Is not possibile to send back data because the socket was closed!
                        print("(DEBUG - ProcessCommand) Logout from " + str(clientip) + ":" + str(clientport)+" was succesfully!")
                        break
                     c = c + 1
                     
                  if (bFlagUserLogged):
                     print("(DEBUG - ProcessCommand) User was logged, it has simply closed the client side without using the LOGOUT command")
                  else:
                     print("(DEBUG - ProcessCommand) User was not logged, it has simply closed the client side")
               
               # Print the whole logged user list for debug
               c = 0
               while c < len(self.userConn):
                  print("User #"+str(c)+": " + str(self.userConn[c]) + "\n")
                  c = c + 1

      # End of the main while loop            
      print ("(DEBUG - ProcessCommand) Exit from run\n")
      return
   
   def sendACK(self, sock):
      sock.send("$ACK-FROM-PARSER%")
      return
      
   def sendNACK(self,sock):
      sock.send("$NACK-FROM-PARSER%")
      return
      
   def sendMessage(self,sock, msg):
      sock.send(msg)
      return
   
   def userLevel(self, sock, userConn):
      # Scan the user list and return the privilege level
      level = "GUEST"
      userList = userConn
      c = 0
      while c < len(userList):
         onerow = userList[c]
         if (sock == onerow[0]):
            level = onerow[4]
            break
         c = c + 1
      return(level)
   
   def stopparser(self):
      print ("(DEBUG - ProcessCommand - stopparser) Parser shutdown!")
      self.stopflag = 1
      return
      
if __name__ == "__main__":

   print("Nonblocking sockets example!")
   print("Listening on port 2626")
   
   try:
      # Setup a TCP server
      TCPserver = TCPThreadedServer(2626, userCommand, processToserver, userBuffer)
      TCPserver.setDaemon(True)
      TCPserver.start()
         
      # initialize the command buffer input
      # used only for debug purpose to manage command
      # we have to use the proper queue and command parser
      TCPServerCommand = ""
      
      # Setup the command parser thread
      TCPCmdParser = ProcessCommand(userConn, userCommand, userList, processToserver)
      TCPCmdParser.setDaemon(True)
      TCPCmdParser.start()
      
      # Infinite loop (keep alive the task)
      while 1:
         if (TCPServerCommand != ""):
            print("(DEBUG - main) New command! " + TCPServerCommand)
            if (TCPServerCommand == "$QUIT%"): break
            TCPServerCommand = ""
         pass
         
   except (KeyboardInterrupt, SystemExit):
      print("(main) Exception detected")
      TCPserver.stopserver()
      TCPCmdParser.stopparser()
   finally:
      time.sleep(1)
      print("(main) Exiting from main program")
      sys.exit()
Attachments
nonblocking14.zip
TCP Server (socket based) with simple command responce protocol
(6.17 KiB) Downloaded 62 times
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am

Re: Nonblocking sockets

Postby Grinch » Wed Sep 25, 2013 12:01 pm

Dear All,
I've noticed that someone have downloaded the source code, I like to know what is your idea about the code organization and the basic idea that I've used to code the server and the command processor. I'm a beginner with Python language and I would like to know all your suggestions and critics about the code logic and also if this can be improved.

Thanks in advance.

Powermos
Grinch
 
Posts: 10
Joined: Wed Sep 04, 2013 10:00 am


Return to Tutorials

Who is online

Users browsing this forum: No registered users and 4 guests