Share List Between Process In Python Server
Solution 1:
The problem is that you're letting the main process finish its execution immediately after you start the worker process. When the process that created the multiprocessing.Manager
finishes its execution, the Manager
server gets shut down, which means your shared list object is now useless. This happens because the Manager
object registers it's shutdown
function as a "finalizer" with the multiprocessing
module, which means it will be run just before the process exits. Here's the code that registers it, in BaseManager.__init__
:
# register a finalizer
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)
Here's the code that actually does the shut down:
@staticmethoddef_finalize_manager(process, address, authkey, state, _Client):
'''
Shutdown the manager process; will be registered as a finalizer
'''if process.is_alive():
util.info('sending shutdown message to manager')
try:
conn = _Client(address, authkey=authkey)
try:
dispatch(conn, None, 'shutdown')
finally:
conn.close()
except Exception:
pass
process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
ifhasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
if process.is_alive():
util.info('manager still alive after terminate')
state.value = State.SHUTDOWN
try:
del BaseProxy._address_to_local[address]
except KeyError:
pass
The fix is simple - don't let the main process complete immediately you start the process that runs the UDP server, by calling server_process.join()
:
import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections
classChatHandler(DatagramRequestHandler):
defhandle(self):
cur_process = multiprocessing.current_process()
data = self.request[0].strip()
socket = self.request[1]
ChatHandler.clients.append(self.client_address) # error hereprint(ChatHandler.clients)
classChatServer(ForkingMixIn, UDPServer):
passif __name__ == '__main__':
server = ChatServer((host, port), ChatHandler)
ChatHandler.clients = multiprocessing.Manager().list()
server_process = multiprocessing.Process(target=server.serve_forever)
server_process.daemon = False
server_process.start()
server_process.join() # This fixes the issue.
Solution 2:
The following shows an example of a UDP server and a shared list.
parent code creates a Manager, a managed list, and passed it to
start_server()
this function in turn actually starts the server, storing the shared list such that the server -- and its handler -- can access it
when a packet arrives, the
handle()
method is triggered. This accesses the server usingself.server
, and the shared list withself.server.client_list
, an attribute on the ChatServer instance.
I did testing by starting the server, waiting a second, then sending a UDP packet "beer" using the netcat
command. For some reason it sends Xs first, and each output is duplicated. This is a bug, but the code should point you in the right direction.
source
import multiprocessing as mp, signal, sys
from SocketServer import (
UDPServer, ForkingMixIn, DatagramRequestHandler
)
classChatHandler(DatagramRequestHandler):
defhandle(self):
data,_socket = self.request
curproc = mp.current_process()
print'{}: {}'.format(
curproc,
dict(
data_len=len(data),
data=data.strip(),
client=self.client_address,
))
self.server.client_list.append(
self.client_address)
print('{}: {}'.format(
curproc,
dict(client_list=self.server.client_list),
))
classChatServer(ForkingMixIn, UDPServer):
client_list = Nonedefstart_server(client_list):
server = ChatServer(('', 9876), ChatHandler)
server.client_list = client_list
server.serve_forever()
if __name__ == '__main__':
clist = mp.Manager().list()
mp.Process(
target=start_server, args=[clist],
name='udpserver',
).start()
signal.alarm(5) # die in 5 seconds
signal.pause() # wait for control-C or alarm
test run
(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Solution 3:
if you're using it anyway like the following way you might require to look at the length of list you're passing or hardcoded count of workers it might be exceeding your machine's capability:
pool = Pool(len(somelist))
# call the function 'somefunction' in parallel for each somelist.
pool.map(somefunction, somelist)
i reduced the workers it resolved the issue for me.
Solution 4:
You can use Python native library multiprocessing.SharedMemory
Or this:
import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()
defworker1(l):
l.append(1)
defworker2(l):
l.append(2)
process1 = multiprocessing.Process(
target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
target=worker2, args=[shared_list])
process1.start()
process2.start()
process1.join()
process2.join()
print shared_list
Solution 5:
If you can't use the manager for whatever reason, you can also implement one on your own that fits your needs.
My unittest was configured to stop all child processes that are left over if they are not properly shut down as expected, which destroyed the manager. So I needed something that can be arbitrarily started and stopped without bothering the tests.
import multiprocessing
import atexit
import select
classSharedDict:
"""Share a dictionary across processes."""def__init__(self):
"""Create a shared dictionary."""super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
def_start(self):
"""Ensure the process to manage the dictionary is running."""if self.process isnotNoneand self.process.is_alive():
return# if the manager has already been running in the past but stopped# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
defmanage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
whileTrue:
message = self.pipe[0].recv()
logger.spam('SharedDict got %s', message)
if message[0] == 'stop':
returnif message[0] == 'set':
shared_dict[message[1]] = message[2]
if message[0] == 'get':
self.pipe[0].send(shared_dict.get(message[1]))
def_stop(self):
"""Stop the managing process."""
self.pipe[1].send(('stop',))
defget(self, key):
"""Get a value from the dictionary."""return self.__getitem__(key)
def__setitem__(self, key, value):
self.pipe[1].send(('set', key, value))
def__getitem__(self, key):
self.pipe[1].send(('get', key))
# to avoid blocking forever if something goes wrong
select.select([self.pipe[1]], [], [], 0.1)
if self.pipe[1].poll():
return self.pipe[1].recv()
returnNonedef__del__(self):
self._stop()
shared_dict = SharedDict()
You can extend this with all sorts of methods, and you can stop and restart it whenever you like (though the dict will be lost each time). The pipes will remain the same all the time, so all child processes can also talk to the restarted manager without the need for new pipe fds.
I might extend this stuff with more functionality. If I didn't move that class into its own module in the meantime, it can be found at https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py
Post a Comment for "Share List Between Process In Python Server"