Skip to content Skip to sidebar Skip to footer

Share List Between Process In Python Server

I have simple UDPServer, which works with multiprocessing. I want to create a list, that contains information about all clients. I use Manager, but I don't understand, how to app

Solution 1:

The problem is that you're letting the main process finish its execution immediately after you start the worker process. When the process that created the multiprocessing.Manager finishes its execution, the Manager server gets shut down, which means your shared list object is now useless. This happens because the Manager object registers it's shutdown function as a "finalizer" with the multiprocessing module, which means it will be run just before the process exits. Here's the code that registers it, in BaseManager.__init__:

# register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

Here's the code that actually does the shut down:

@staticmethoddef_finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            ifhasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

The fix is simple - don't let the main process complete immediately you start the process that runs the UDP server, by calling server_process.join():

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

classChatHandler(DatagramRequestHandler):

    defhandle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error hereprint(ChatHandler.clients)


classChatServer(ForkingMixIn, UDPServer):
    passif __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.

Solution 2:

The following shows an example of a UDP server and a shared list.

  • parent code creates a Manager, a managed list, and passed it to start_server()

  • this function in turn actually starts the server, storing the shared list such that the server -- and its handler -- can access it

  • when a packet arrives, the handle() method is triggered. This accesses the server using self.server, and the shared list with self.server.client_list, an attribute on the ChatServer instance.

I did testing by starting the server, waiting a second, then sending a UDP packet "beer" using the netcat command. For some reason it sends Xs first, and each output is duplicated. This is a bug, but the code should point you in the right direction.

source

import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

classChatHandler(DatagramRequestHandler):
    defhandle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print'{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

classChatServer(ForkingMixIn, UDPServer):
    client_list = Nonedefstart_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

test run

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}

Solution 3:

if you're using it anyway like the following way you might require to look at the length of list you're passing or hardcoded count of workers it might be exceeding your machine's capability:

        pool = Pool(len(somelist))
        # call the function 'somefunction' in parallel for each somelist.
        pool.map(somefunction, somelist)

i reduced the workers it resolved the issue for me.

Solution 4:

You can use Python native library multiprocessing.SharedMemory

Or this:

import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()

defworker1(l):
    l.append(1)

defworker2(l):
    l.append(2)

process1 = multiprocessing.Process(
    target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
    target=worker2, args=[shared_list])

process1.start()
process2.start()
process1.join()
process2.join()

print shared_list

Solution 5:

If you can't use the manager for whatever reason, you can also implement one on your own that fits your needs.

My unittest was configured to stop all child processes that are left over if they are not properly shut down as expected, which destroyed the manager. So I needed something that can be arbitrarily started and stopped without bothering the tests.

import multiprocessing
import atexit
import select

classSharedDict:
    """Share a dictionary across processes."""def__init__(self):
        """Create a shared dictionary."""super().__init__()
        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)
        self._start()

    def_start(self):
        """Ensure the process to manage the dictionary is running."""if self.process isnotNoneand self.process.is_alive():
            return# if the manager has already been running in the past but stopped# for some reason, the dictionary contents are lost
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    defmanage(self):
        """Manage the dictionary, handle read and write requests."""
        shared_dict = dict()
        whileTrue:
            message = self.pipe[0].recv()
            logger.spam('SharedDict got %s', message)

            if message[0] == 'stop':
                returnif message[0] == 'set':
                shared_dict[message[1]] = message[2]

            if message[0] == 'get':
                self.pipe[0].send(shared_dict.get(message[1]))

    def_stop(self):
        """Stop the managing process."""
        self.pipe[1].send(('stop',))

    defget(self, key):
        """Get a value from the dictionary."""return self.__getitem__(key)

    def__setitem__(self, key, value):
        self.pipe[1].send(('set', key, value))

    def__getitem__(self, key):
        self.pipe[1].send(('get', key))

        # to avoid blocking forever if something goes wrong
        select.select([self.pipe[1]], [], [], 0.1)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        returnNonedef__del__(self):
        self._stop()


shared_dict = SharedDict()

You can extend this with all sorts of methods, and you can stop and restart it whenever you like (though the dict will be lost each time). The pipes will remain the same all the time, so all child processes can also talk to the restarted manager without the need for new pipe fds.

I might extend this stuff with more functionality. If I didn't move that class into its own module in the meantime, it can be found at https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py

Post a Comment for "Share List Between Process In Python Server"