To achieve accurate timeout using Python's multiprocess apply_async method, you can use the get() method with a timeout parameter. Here's an example code snippet that demonstrates this:
import multiprocessing
import time
def my_function(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = multiprocessing.Pool()
result = pool.apply_async(my_function, args=(10,))
try:
output = result.get(timeout=0.5)
print(output)
except multiprocessing.TimeoutError:
print("Timeout occurred.")
In this example, we define a function my_function
that takes x
as an argument and returns its squared value after waiting for 1 second. We then create a multiprocessing.Pool object and use its apply_async method to run my_function
in a separate process with a timeout of 0.5 seconds.
The result
object returned by apply_async
is a multiprocessing.AsyncResult object which we can use to retrieve the output of the function call. We use the get
method with the timeout
parameter set to 0.5 seconds to retrieve the output. If the timeout occurs before the function call completes, a multiprocessing.TimeoutError exception is raised and we print a message indicating that a timeout occurred.
Asked: 2023-01-04 11:00:00 +0000
Seen: 7 times
Last updated: Dec 25 '22