Manager().Queue() cannot be pickled
My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.
However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().
Here's basically what I'm doing:
import multiprocessing as mp
def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers
def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)
def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result
def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass
def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters
def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers
if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)
How do I ensure the child processes can operate on the queue?
python python-3.x pickle python-multiprocessing
add a comment |
My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.
However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().
Here's basically what I'm doing:
import multiprocessing as mp
def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers
def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)
def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result
def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass
def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters
def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers
if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)
How do I ensure the child processes can operate on the queue?
python python-3.x pickle python-multiprocessing
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21
add a comment |
My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.
However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().
Here's basically what I'm doing:
import multiprocessing as mp
def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers
def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)
def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result
def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass
def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters
def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers
if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)
How do I ensure the child processes can operate on the queue?
python python-3.x pickle python-multiprocessing
My multiprocessing use-case involves a handful of processes adding data to four queues, and one process per queue working on that data. Because I need to have multiple processes adding to and removing from each queue, it looked like multiprocessing.Manager().Queue() was the right tool for the job.
However, when I spawn my worker processes, I get can't pickle _thread.rlock objects when executing mp.Process(target=my_method, args=(a,queue,)).start().
Here's basically what I'm doing:
import multiprocessing as mp
def create_managers():
destination_managers = {}
for k in desired_queues:
destination_managers[k] = {'man': mp.Manager()}
destination_managers[k]['q'] = destination_managers[k]['man'].Queue()
return destination_managers
def add_to_queue(queue, source):
data = get_item_from_source(source)
queue.put(data)
def getn(q,n):
result = [q.get()]
try:
while len(result) < n:
result.append(q.get(block=False))
except managers.queue.Empty:
pass
return result
def do_work(queue):
while True:
try:
next_chunk = getn(queue,1000) # Get next 1,000 items from queue.
result =
for i in next_chunk:
# Operate on the data
processed_data = process_the_data(next_chunk[i])
result.append(processed_data)
try:
# Upload the processed chunk to a database
insert_to_db(result)
except:
# Sometimes the uploads fail due to unreliable network. Put the data back in the queue so we try again later.
for i in next_chunk:
queue.put(next_chunk[row])
except queue.empty:
pass
def spawn_putters(dest_mgrs, source_list):
putters =
for k in source_list:
proc = mp.Process(target=add_to_queue, args=(dest_mgrs[source_dest_map[k]]['q'],source_list[k],)
putters.append(proc)
proc.start()
return putters
def spawn_workers(dest_mgrs):
workers =
for k in dest_mgrs:
proc = mp.Process(target=do_work,args=(dest_mgrs[k]['q'],)
workers.append(proc)
proc.start()
return workers
if __name__ == "__main__":
source_list #is defined
destination_managers = create_managers()
putters = spawn_putters(destination_managers, source_list)
workers = spawn_workers(destination_managers)
How do I ensure the child processes can operate on the queue?
python python-3.x pickle python-multiprocessing
python python-3.x pickle python-multiprocessing
edited Nov 14 at 17:18
Konrad Rudolph
394k1017781023
394k1017781023
asked Nov 13 at 20:48
Nate Gardner
582522
582522
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21
add a comment |
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21
add a comment |
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53289273%2fmanager-queue-cannot-be-pickled%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53289273%2fmanager-queue-cannot-be-pickled%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Can you create a Minimal, Complete, and Verifiable example? The code above can't be executed. There are a number of syntax errors (missing parentheses) and undefined variables. Just fake enough data to reproduce the error. Post the traceback as well.
– Mark Tolonen
Nov 14 at 17:21