Manager().Queue() cannot be pickled












0














My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.



However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().



Here's basically what I'm doing:



import multiprocessing as mp

def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers

def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)

def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result

def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass

def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters

def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers

if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)


How do I ensure the child processes can operate on the queue?










share|improve this question
























  • Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
    – Mark Tolonen
    Nov 14 at 17:21


















0














My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.



However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().



Here's basically what I'm doing:



import multiprocessing as mp

def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers

def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)

def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result

def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass

def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters

def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers

if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)


How do I ensure the child processes can operate on the queue?










share|improve this question
























  • Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
    – Mark Tolonen
    Nov 14 at 17:21
















0












0








0







My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.



However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().



Here's basically what I'm doing:



import multiprocessing as mp

def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers

def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)

def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result

def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass

def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters

def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers

if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)


How do I ensure the child processes can operate on the queue?










share|improve this question















My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.



However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().



Here's basically what I'm doing:



import multiprocessing as mp

def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers

def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)

def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result

def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass

def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters

def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers

if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)


How do I ensure the child processes can operate on the queue?







python python-3.x pickle python-multiprocessing






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 14 at 17:18









Konrad Rudolph

394k1017781023




394k1017781023










asked Nov 13 at 20:48









Nate Gardner

582522




582522












  • Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
    – Mark Tolonen
    Nov 14 at 17:21




















  • Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
    – Mark Tolonen
    Nov 14 at 17:21


















Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21






Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21



















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53289273%2fmanager-queue-cannot-be-pickled%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53289273%2fmanager-queue-cannot-be-pickled%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

How to pass form data using jquery Ajax to insert data in database?

National Museum of Racing and Hall of Fame

Guess what letter conforming each word