ulvis.paste.net

Paste Search Dynamic
Recent pastes
Multiprocess daemon sample in
  1. import concurrent.futures
  2. import logging
  3. import multiprocessing
  4. import random
  5. import time
  6.  
  7. import pebble
  8.  
  9. logging.basicConfig(
  10.     level=logging.INFO,
  11.     format='%(asctime)s %(processName)s[%(process)d] %(message)s'
  12. )
  13. log = logging.getLogger('daemon')
  14.  
  15. WORKER_COUNT = multiprocessing.cpu_count()
  16. TASK_TIMEOUT = 2
  17.  
  18.  
  19. def work(task_id):
  20.     pause_length = random.randint(1, 40) / 10
  21.     log.info('Launch worker #%d, pause %.1f', task_id, pause_length)
  22.     time.sleep(pause_length)
  23.     assert random.randint(0, 1)
  24.  
  25.  
  26. if __name__ == '__main__':
  27.     tasks = {}
  28.  
  29.     log.info('Starting')
  30.  
  31.     task_queue = sorted(range(0, 20, 2))
  32.     with pebble.ProcessPool(max_tasks=4) as pool:
  33.         while task_queue:
  34.             if len(tasks) <= WORKER_COUNT:
  35.                 task_id = task_queue.pop(0)
  36.                 tasks[task_id] = pool.schedule(
  37.                     work,
  38.                     args=(task_id, ),
  39.                     timeout=TASK_TIMEOUT,
  40.                 )
  41.  
  42.             for task_id, task in set(tasks.items()):
  43.                 if task.done():
  44.                     try:
  45.                         task.result()
  46.                         log.info('Removing task #%d [SUCCESS]', task_id)
  47.                     except assertionerror:
  48.                         log.info('Removing task #%d [FAILURE]', task_id)
  49.                     except concurrent.futures.TimeoutError:
  50.                         log.info('Removing task #%d [TIMEOUT]', task_id)
  51.                     del tasks[task_id]
  52.  
  53.             time.sleep(.1)
  54.  
  55.         log.info('Shutting down')
  56.  
  57.     log.info('Finished')
  58.  
  59.     exit(0)
Parsed in 0.014 seconds