Package lib :: Package cuckoo :: Package core :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module lib.cuckoo.core.scheduler

  1  # Copyright (C) 2010-2013 Claudio Guarnieri. 
  2  # Copyright (C) 2014-2016 Cuckoo Foundation. 
  3  # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org 
  4  # See the file 'docs/LICENSE' for copying permission. 
  5   
  6  import os 
  7  import time 
  8  import shutil 
  9  import logging 
 10  import threading 
 11  import Queue 
 12   
 13  from lib.cuckoo.common.config import Config, emit_options 
 14  from lib.cuckoo.common.constants import CUCKOO_ROOT 
 15  from lib.cuckoo.common.exceptions import CuckooMachineError, CuckooGuestError 
 16  from lib.cuckoo.common.exceptions import CuckooOperationalError 
 17  from lib.cuckoo.common.exceptions import CuckooCriticalError 
 18  from lib.cuckoo.common.objects import File 
 19  from lib.cuckoo.common.utils import create_folder 
 20  from lib.cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED 
 21  from lib.cuckoo.core.guest import GuestManager 
 22  from lib.cuckoo.core.log import task_log_start, task_log_stop 
 23  from lib.cuckoo.core.plugins import list_plugins, RunAuxiliary, RunProcessing 
 24  from lib.cuckoo.core.plugins import RunSignatures, RunReporting 
 25  from lib.cuckoo.core.resultserver import ResultServer 
 26  from lib.cuckoo.core.rooter import rooter, vpns 
 27   
 28  log = logging.getLogger(__name__) 
 29   
 30  machinery = None 
 31  machine_lock = None 
 32  latest_symlink_lock = threading.Lock() 
 33   
 34  active_analysis_count = 0 
 35   
36 -class AnalysisManager(threading.Thread):
37 """Analysis Manager. 38 39 This class handles the full analysis process for a given task. It takes 40 care of selecting the analysis machine, preparing the configuration and 41 interacting with the guest agent and analyzer components to launch and 42 complete the analysis and store, process and report its results. 43 """ 44
45 - def __init__(self, task_id, error_queue):
46 """@param task: task object containing the details for the analysis.""" 47 threading.Thread.__init__(self) 48 49 self.errors = error_queue 50 self.cfg = Config() 51 self.storage = "" 52 self.binary = "" 53 self.storage_binary = "" 54 self.machine = None 55 56 self.db = Database() 57 self.task = self.db.view_task(task_id) 58 self.guest_manager = None 59 60 self.interface = None 61 self.rt_table = None
62
63 - def init_storage(self):
64 """Initialize analysis storage folder.""" 65 self.storage = os.path.join(CUCKOO_ROOT, 66 "storage", 67 "analyses", 68 str(self.task.id)) 69 70 # If the analysis storage folder already exists, we need to abort the 71 # analysis or previous results will be overwritten and lost. 72 if os.path.exists(self.storage): 73 log.error("Analysis results folder already exists at path \"%s\"," 74 " analysis aborted", self.storage) 75 return False 76 77 # If we're not able to create the analysis storage folder, we have to 78 # abort the analysis. 79 try: 80 create_folder(folder=self.storage) 81 except CuckooOperationalError: 82 log.error("Unable to create analysis folder %s", self.storage) 83 return False 84 85 return True
86
87 - def check_permissions(self):
88 """Checks if we have permissions to access the file to be analyzed.""" 89 if os.access(self.task.target, os.R_OK): 90 return True 91 92 log.error( 93 "Unable to access target file, please check if we have " 94 "permissions to access the file: \"%s\"", 95 self.task.target 96 ) 97 return False
98
99 - def check_file(self):
100 """Checks the integrity of the file to be analyzed.""" 101 sample = self.db.view_sample(self.task.sample_id) 102 103 sha256 = File(self.task.target).get_sha256() 104 if sha256 != sample.sha256: 105 log.error("Target file has been modified after submission: \"%s\"", self.task.target) 106 return False 107 108 return True
109
110 - def store_file(self):
111 """Store a copy of the file being analyzed.""" 112 if not os.path.exists(self.task.target): 113 log.error("The file to analyze does not exist at path \"%s\", " 114 "analysis aborted", self.task.target) 115 return False 116 117 sha256 = File(self.task.target).get_sha256() 118 self.binary = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256) 119 120 if os.path.exists(self.binary): 121 log.info("File already exists at \"%s\"", self.binary) 122 else: 123 # TODO: do we really need to abort the analysis in case we are not 124 # able to store a copy of the file? 125 try: 126 shutil.copy(self.task.target, self.binary) 127 except (IOError, shutil.Error) as e: 128 log.error("Unable to store file from \"%s\" to \"%s\", " 129 "analysis aborted", self.task.target, self.binary) 130 return False 131 132 try: 133 self.storage_binary = os.path.join(self.storage, "binary") 134 135 if hasattr(os, "symlink"): 136 os.symlink(self.binary, self.storage_binary) 137 else: 138 shutil.copy(self.binary, self.storage_binary) 139 except (AttributeError, OSError) as e: 140 log.error("Unable to create symlink/copy from \"%s\" to " 141 "\"%s\": %s", self.binary, self.storage, e) 142 143 return True
144
145 - def store_task_info(self):
146 """grab latest task from db (if available) and update self.task""" 147 dbtask = self.db.view_task(self.task.id) 148 self.task = dbtask.to_dict() 149 150 task_info_path = os.path.join(self.storage, "task.json") 151 open(task_info_path, "w").write(dbtask.to_json())
152
153 - def acquire_machine(self):
154 """Acquire an analysis machine from the pool of available ones.""" 155 machine = None 156 157 # Start a loop to acquire the a machine to run the analysis on. 158 while True: 159 machine_lock.acquire() 160 161 # In some cases it's possible that we enter this loop without 162 # having any available machines. We should make sure this is not 163 # such case, or the analysis task will fail completely. 164 if not machinery.availables(): 165 machine_lock.release() 166 time.sleep(1) 167 continue 168 169 # If the user specified a specific machine ID, a platform to be 170 # used or machine tags acquire the machine accordingly. 171 machine = machinery.acquire(machine_id=self.task.machine, 172 platform=self.task.platform, 173 tags=self.task.tags) 174 175 # If no machine is available at this moment, wait for one second 176 # and try again. 177 if not machine: 178 machine_lock.release() 179 log.debug("Task #%d: no machine available yet", self.task.id) 180 time.sleep(1) 181 else: 182 log.info("Task #%d: acquired machine %s (label=%s)", 183 self.task.id, machine.name, machine.label) 184 break 185 186 self.machine = machine
187
188 - def build_options(self):
189 """Generate analysis options. 190 @return: options dict. 191 """ 192 options = {} 193 194 if self.task.category == "file": 195 options["file_name"] = File(self.task.target).get_name() 196 options["file_type"] = File(self.task.target).get_type() 197 options["pe_exports"] = \ 198 ",".join(File(self.task.target).get_exported_functions()) 199 200 package, activity = File(self.task.target).get_apk_entry() 201 self.task.options["apk_entry"] = "%s:%s" % (package, activity) 202 203 options["id"] = self.task.id 204 options["ip"] = self.machine.resultserver_ip 205 options["port"] = self.machine.resultserver_port 206 options["category"] = self.task.category 207 options["target"] = self.task.target 208 options["package"] = self.task.package 209 options["options"] = emit_options(self.task.options) 210 options["enforce_timeout"] = self.task.enforce_timeout 211 options["clock"] = self.task.clock 212 options["terminate_processes"] = self.cfg.cuckoo.terminate_processes 213 214 if not self.task.timeout: 215 options["timeout"] = self.cfg.timeouts.default 216 else: 217 options["timeout"] = self.task.timeout 218 219 # copy in other analyzer specific options, TEMPORARY (most likely) 220 vm_options = getattr(machinery.options, self.machine.name) 221 for k in vm_options: 222 if k.startswith("analyzer_"): 223 options[k] = vm_options[k] 224 225 return options
226
227 - def route_network(self):
228 """Enable network routing if desired.""" 229 # Determine the desired routing strategy (none, internet, VPN). 230 route = self.task.options.get("route", self.cfg.routing.route) 231 232 if route == "none": 233 self.interface = None 234 self.rt_table = None 235 elif route == "internet" and self.cfg.routing.internet != "none": 236 self.interface = self.cfg.routing.internet 237 self.rt_table = self.cfg.routing.rt_table 238 elif route in vpns: 239 self.interface = vpns[route].interface 240 self.rt_table = vpns[route].rt_table 241 else: 242 log.warning("Unknown network routing destination specified, " 243 "ignoring routing for this analysis: %r", route) 244 self.interface = None 245 self.rt_table = None 246 247 # Check if the network interface is still available. If a VPN dies for 248 # some reason, its tunX interface will no longer be available. 249 if self.interface and not rooter("nic_available", self.interface): 250 log.error( 251 "The network interface '%s' configured for this analysis is " 252 "not available at the moment, switching to route=none mode.", 253 self.interface 254 ) 255 route = "none" 256 self.task.options["route"] = "none" 257 self.interface = None 258 self.rt_table = None 259 260 if self.interface: 261 rooter("forward_enable", self.machine.interface, 262 self.interface, self.machine.ip) 263 264 if self.rt_table: 265 rooter("srcroute_enable", self.rt_table, self.machine.ip) 266 267 # Propagate the taken route to the database. 268 self.db.set_route(self.task.id, route)
269
270 - def unroute_network(self):
271 if self.interface: 272 rooter("forward_disable", self.machine.interface, 273 self.interface, self.machine.ip) 274 275 if self.rt_table: 276 rooter("srcroute_disable", self.rt_table, self.machine.ip)
277
278 - def wait_finish(self):
279 """Some VMs don't have an actual agent. Mainly those that are used as 280 assistance for an analysis through the services auxiliary module. This 281 method just waits until the analysis is finished rather than actively 282 trying to engage with the Cuckoo Agent.""" 283 self.db.guest_set_status(self.task.id, "running") 284 while self.db.guest_get_status(self.task.id) == "running": 285 time.sleep(1)
286
287 - def guest_manage(self, options):
288 # Handle a special case where we're creating a baseline report of this 289 # particular virtual machine - a report containing all the results 290 # that are gathered if no additional samples are ran in the VM. These 291 # results, such as loaded drivers and opened sockets in volatility, or 292 # DNS requests to hostnames related to Microsoft Windows, etc may be 293 # omitted or at the very least given less priority when creating a 294 # report for an analysis that ran on this VM later on. 295 if self.task.category == "baseline": 296 time.sleep(options["timeout"]) 297 else: 298 # Start the analysis. 299 self.db.guest_set_status(self.task.id, "starting") 300 monitor = self.task.options.get("monitor", "latest") 301 self.guest_manager.start_analysis(options, monitor) 302 303 # In case the Agent didn't respond and we force-quit the analysis 304 # at some point while it was still starting the analysis the state 305 # will be "stop" (or anything but "running", really). 306 if self.db.guest_get_status(self.task.id) == "starting": 307 self.db.guest_set_status(self.task.id, "running") 308 self.guest_manager.wait_for_completion() 309 310 self.db.guest_set_status(self.task.id, "stopping")
311
312 - def launch_analysis(self):
313 """Start analysis.""" 314 succeeded = False 315 316 target = self.task.target 317 if self.task.category == "file": 318 target = os.path.basename(target) 319 320 log.info("Starting analysis of %s \"%s\" (task #%d, options \"%s\")", 321 self.task.category.upper(), target, self.task.id, 322 emit_options(self.task.options)) 323 324 # Initialize the analysis folders. 325 if not self.init_storage(): 326 return False 327 328 # Initiates per-task logging. 329 task_log_start(self.task.id) 330 331 self.store_task_info() 332 333 if self.task.category == "file": 334 # Check if we have permissions to access the file. 335 # And fail this analysis if we don't have access to the file. 336 if not self.check_permissions(): 337 return False 338 339 # Check whether the file has been changed for some unknown reason. 340 # And fail this analysis if it has been modified. 341 if not self.check_file(): 342 return False 343 344 # Store a copy of the original file. 345 if not self.store_file(): 346 return False 347 348 # Acquire analysis machine. 349 try: 350 self.acquire_machine() 351 except CuckooOperationalError as e: 352 machine_lock.release() 353 log.error("Cannot acquire machine: {0}".format(e)) 354 return False 355 356 # At this point we can tell the ResultServer about it. 357 try: 358 ResultServer().add_task(self.task, self.machine) 359 except Exception as e: 360 machinery.release(self.machine.label) 361 self.errors.put(e) 362 363 # Initialize the guest manager. 364 self.guest_manager = GuestManager( 365 self.machine.name, self.machine.ip, 366 self.machine.platform, self.task.id, self 367 ) 368 369 self.aux = RunAuxiliary(self.task, self.machine, self.guest_manager) 370 self.aux.start() 371 372 # Generate the analysis configuration file. 373 options = self.build_options() 374 375 try: 376 unlocked = False 377 self.interface = None 378 379 # Mark the selected analysis machine in the database as started. 380 guest_log = self.db.guest_start(self.task.id, 381 self.machine.name, 382 self.machine.label, 383 machinery.__class__.__name__) 384 # Start the machine. 385 machinery.start(self.machine.label, self.task) 386 387 # Enable network routing. 388 self.route_network() 389 390 # By the time start returns it will have fully started the Virtual 391 # Machine. We can now safely release the machine lock. 392 machine_lock.release() 393 unlocked = True 394 395 # Run and manage the components inside the guest unless this 396 # machine has the "noagent" option specified (please refer to the 397 # wait_finish() function for more details on this function). 398 if "noagent" not in self.machine.options: 399 self.guest_manage(options) 400 else: 401 self.wait_finish() 402 403 succeeded = True 404 except CuckooMachineError as e: 405 if not unlocked: 406 machine_lock.release() 407 log.error( 408 "Machinery error: %s", 409 e, extra={"task_id": self.task.id} 410 ) 411 log.critical( 412 "A critical error has occurred trying to use the machine " 413 "with name %s during an analysis due to which it is no " 414 "longer in a working state, please report this issue and all " 415 "of the related environment details to the developers so we " 416 "can improve this situation. (Note that before we would " 417 "simply remove this VM from doing any more analyses, but as " 418 "all the VMs will eventually be depleted that way, hopefully " 419 "we'll find a better solution now).", self.machine.name, 420 ) 421 except CuckooGuestError as e: 422 if not unlocked: 423 machine_lock.release() 424 log.error( 425 "Error from the Cuckoo Guest: %s", 426 e, extra={"task_id": self.task.id} 427 ) 428 finally: 429 # Stop Auxiliary modules. 430 self.aux.stop() 431 432 # Take a memory dump of the machine before shutting it off. 433 if self.cfg.cuckoo.memory_dump or self.task.memory: 434 try: 435 dump_path = os.path.join(self.storage, "memory.dmp") 436 machinery.dump_memory(self.machine.label, dump_path) 437 except NotImplementedError: 438 log.error("The memory dump functionality is not available " 439 "for the current machine manager.") 440 except CuckooMachineError as e: 441 log.error("Machinery error: %s", e) 442 443 try: 444 # Stop the analysis machine. 445 machinery.stop(self.machine.label) 446 except CuckooMachineError as e: 447 log.warning("Unable to stop machine %s: %s", 448 self.machine.label, e) 449 450 # Mark the machine in the database as stopped. Unless this machine 451 # has been marked as dead, we just keep it as "started" in the 452 # database so it'll not be used later on in this session. 453 self.db.guest_stop(guest_log) 454 455 # After all this, we can make the ResultServer forget about the 456 # internal state for this analysis task. 457 ResultServer().del_task(self.task, self.machine) 458 459 # Drop the network routing rules if any. 460 self.unroute_network() 461 462 try: 463 # Release the analysis machine. But only if the machine has 464 # not turned dead yet. 465 machinery.release(self.machine.label) 466 except CuckooMachineError as e: 467 log.error("Unable to release machine %s, reason %s. " 468 "You might need to restore it manually.", 469 self.machine.label, e) 470 471 return succeeded
472
473 - def process_results(self):
474 """Process the analysis results and generate the enabled reports.""" 475 results = RunProcessing(task=self.task).run() 476 RunSignatures(results=results).run() 477 RunReporting(task=self.task, results=results).run() 478 479 # If the target is a file and the user enabled the option, 480 # delete the original copy. 481 if self.task.category == "file" and self.cfg.cuckoo.delete_original: 482 if not os.path.exists(self.task.target): 483 log.warning("Original file does not exist anymore: \"%s\": " 484 "File not found.", self.task.target) 485 else: 486 try: 487 os.remove(self.task.target) 488 except OSError as e: 489 log.error("Unable to delete original file at path " 490 "\"%s\": %s", self.task.target, e) 491 492 # If the target is a file and the user enabled the delete copy of 493 # the binary option, then delete the copy. 494 if self.task.category == "file" and self.cfg.cuckoo.delete_bin_copy: 495 if not os.path.exists(self.binary): 496 log.warning("Copy of the original file does not exist anymore: \"%s\": File not found", self.binary) 497 else: 498 try: 499 os.remove(self.binary) 500 except OSError as e: 501 log.error("Unable to delete the copy of the original file at path \"%s\": %s", self.binary, e) 502 # Check if the binary in the analysis directory is an invalid symlink. If it is, delete it. 503 if os.path.islink(self.storage_binary) and not os.path.exists(self.storage_binary): 504 try: 505 os.remove(self.storage_binary) 506 except OSError as e: 507 log.error("Unable to delete symlink to the binary copy at path \"%s\": %s", self.storage_binary, e) 508 509 log.info("Task #%d: reports generation completed (path=%s)", 510 self.task.id, self.storage) 511 512 return True
513
514 - def run(self):
515 """Run manager thread.""" 516 global active_analysis_count 517 active_analysis_count += 1 518 try: 519 self.launch_analysis() 520 521 self.db.set_status(self.task.id, TASK_COMPLETED) 522 523 log.debug("Released database task #%d", self.task.id) 524 525 if self.cfg.cuckoo.process_results: 526 # this updates self.task so processing gets the latest and greatest 527 self.store_task_info() 528 529 self.process_results() 530 self.db.set_status(self.task.id, TASK_REPORTED) 531 532 # We make a symbolic link ("latest") which links to the latest 533 # analysis - this is useful for debugging purposes. This is only 534 # supported under systems that support symbolic links. 535 if hasattr(os, "symlink"): 536 latest = os.path.join(CUCKOO_ROOT, "storage", 537 "analyses", "latest") 538 539 # First we have to remove the existing symbolic link, then we 540 # have to create the new one. 541 # Deal with race conditions using a lock. 542 latest_symlink_lock.acquire() 543 try: 544 # As per documentation, lexists() returns True for dead 545 # symbolic links. 546 if os.path.lexists(latest): 547 os.remove(latest) 548 549 os.symlink(self.storage, latest) 550 except OSError as e: 551 log.warning("Error pointing latest analysis symlink: %s" % e) 552 finally: 553 latest_symlink_lock.release() 554 555 # overwrite task.json so we have the latest data inside 556 self.store_task_info() 557 log.info("Task #%d: analysis procedure completed", self.task.id) 558 except: 559 log.exception("Failure in AnalysisManager.run") 560 561 task_log_stop(self.task.id) 562 active_analysis_count -= 1
563
564 -class Scheduler(object):
565 """Tasks Scheduler. 566 567 This class is responsible for the main execution loop of the tool. It 568 prepares the analysis machines and keep waiting and loading for new 569 analysis tasks. 570 Whenever a new task is available, it launches AnalysisManager which will 571 take care of running the full analysis process and operating with the 572 assigned analysis machine. 573 """
574 - def __init__(self, maxcount=None):
575 self.running = True 576 self.cfg = Config() 577 self.db = Database() 578 self.maxcount = maxcount 579 self.total_analysis_count = 0
580
581 - def initialize(self):
582 """Initialize the machine manager.""" 583 global machinery, machine_lock 584 585 machinery_name = self.cfg.cuckoo.machinery 586 587 max_vmstartup_count = self.cfg.cuckoo.max_vmstartup_count 588 if max_vmstartup_count: 589 machine_lock = threading.Semaphore(max_vmstartup_count) 590 else: 591 machine_lock = threading.Lock() 592 593 log.info("Using \"%s\" as machine manager", machinery_name) 594 595 # Get registered class name. Only one machine manager is imported, 596 # therefore there should be only one class in the list. 597 plugin = list_plugins("machinery")[0] 598 # Initialize the machine manager. 599 machinery = plugin() 600 601 # Find its configuration file. 602 conf = os.path.join(CUCKOO_ROOT, "conf", "%s.conf" % machinery_name) 603 604 if not os.path.exists(conf): 605 raise CuckooCriticalError("The configuration file for machine " 606 "manager \"{0}\" does not exist at path:" 607 " {1}".format(machinery_name, conf)) 608 609 # Provide a dictionary with the configuration options to the 610 # machine manager instance. 611 machinery.set_options(Config(machinery_name)) 612 613 # Initialize the machine manager. 614 try: 615 machinery.initialize(machinery_name) 616 except CuckooMachineError as e: 617 raise CuckooCriticalError("Error initializing machines: %s" % e) 618 619 # At this point all the available machines should have been identified 620 # and added to the list. If none were found, Cuckoo needs to abort the 621 # execution. 622 if not len(machinery.machines()): 623 raise CuckooCriticalError("No machines available.") 624 else: 625 log.info("Loaded %s machine/s", len(machinery.machines())) 626 627 if len(machinery.machines()) > 1 and self.db.engine.name == "sqlite": 628 log.warning("As you've configured Cuckoo to execute parallel " 629 "analyses, we recommend you to switch to a MySQL or" 630 "a PostgreSQL database as SQLite might cause some " 631 "issues.") 632 633 if len(machinery.machines()) > 4 and self.cfg.cuckoo.process_results: 634 log.warning("When running many virtual machines it is recommended " 635 "to process the results in a separate process.py to " 636 "increase throughput and stability. Please read the " 637 "documentation about the `Processing Utility`.") 638 639 # Drop all existing packet forwarding rules for each VM. Just in case 640 # Cuckoo was terminated for some reason and various forwarding rules 641 # have thus not been dropped yet. 642 for machine in machinery.machines(): 643 if not machine.interface: 644 log.info("Unable to determine the network interface for VM " 645 "with name %s, Cuckoo will not be able to give it " 646 "full internet access or route it through a VPN! " 647 "Please define a default network interface for the " 648 "machinery or define a network interface for each " 649 "VM.", machine.name) 650 continue 651 652 # Drop forwarding rule to each VPN. 653 for vpn in vpns.values(): 654 rooter("forward_disable", machine.interface, 655 vpn.interface, machine.ip) 656 657 # Drop forwarding rule to the internet / dirty line. 658 if self.cfg.routing.internet != "none": 659 rooter("forward_disable", machine.interface, 660 self.cfg.routing.internet, machine.ip)
661
662 - def stop(self):
663 """Stop scheduler.""" 664 self.running = False 665 # Shutdown machine manager (used to kill machines that still alive). 666 machinery.shutdown()
667
668 - def start(self):
669 """Start scheduler.""" 670 self.initialize() 671 672 log.info("Waiting for analysis tasks.") 673 674 # Message queue with threads to transmit exceptions (used as IPC). 675 errors = Queue.Queue() 676 677 # Command-line overrides the configuration file. 678 if self.maxcount is None: 679 self.maxcount = self.cfg.cuckoo.max_analysis_count 680 681 # This loop runs forever. 682 while self.running: 683 time.sleep(1) 684 685 # Wait until the machine lock is not locked. This is only the case 686 # when all machines are fully running, rather that about to start 687 # or still busy starting. This way we won't have race conditions 688 # with finding out there are no available machines in the analysis 689 # manager or having two analyses pick the same machine. 690 if not machine_lock.acquire(False): 691 continue 692 693 machine_lock.release() 694 695 # If not enough free disk space is available, then we print an 696 # error message and wait another round (this check is ignored 697 # when the freespace configuration variable is set to zero). 698 if self.cfg.cuckoo.freespace: 699 # Resolve the full base path to the analysis folder, just in 700 # case somebody decides to make a symbolic link out of it. 701 dir_path = os.path.join(CUCKOO_ROOT, "storage", "analyses") 702 703 # TODO: Windows support 704 if hasattr(os, "statvfs"): 705 dir_stats = os.statvfs(dir_path) 706 707 # Calculate the free disk space in megabytes. 708 space_available = dir_stats.f_bavail * dir_stats.f_frsize 709 space_available /= 1024 * 1024 710 711 if space_available < self.cfg.cuckoo.freespace: 712 log.error("Not enough free disk space! (Only %d MB!)", 713 space_available) 714 continue 715 716 # Have we limited the number of concurrently executing machines? 717 if self.cfg.cuckoo.max_machines_count: 718 # Are too many running? 719 if len(machinery.running()) >= self.cfg.cuckoo.max_machines_count: 720 continue 721 722 # If no machines are available, it's pointless to fetch for 723 # pending tasks. Loop over. 724 if not machinery.availables(): 725 continue 726 727 # Exits if max_analysis_count is defined in the configuration 728 # file and has been reached. 729 if self.maxcount and self.total_analysis_count >= self.maxcount: 730 if active_analysis_count <= 0: 731 log.debug("Reached max analysis count, exiting.") 732 self.stop() 733 continue 734 735 # Fetch a pending analysis task. 736 # TODO This fixes only submissions by --machine, need to add 737 # other attributes (tags etc). 738 # TODO We should probably move the entire "acquire machine" logic 739 # from the Analysis Manager to the Scheduler and then pass the 740 # selected machine onto the Analysis Manager instance. 741 task, available = None, False 742 for machine in self.db.get_available_machines(): 743 task = self.db.fetch(machine=machine.name) 744 if task: 745 break 746 747 if machine.is_analysis(): 748 available = True 749 750 # We only fetch a new task if at least one of the available 751 # machines is not a "service" machine (again, please refer to the 752 # services auxiliary module for more information on service VMs). 753 if not task and available: 754 task = self.db.fetch(service=False) 755 756 if task: 757 log.debug("Processing task #%s", task.id) 758 self.total_analysis_count += 1 759 760 # Initialize and start the analysis manager. 761 analysis = AnalysisManager(task.id, errors) 762 analysis.daemon = True 763 analysis.start() 764 765 # Deal with errors. 766 try: 767 raise errors.get(block=False) 768 except Queue.Empty: 769 pass 770 771 log.debug("End of analyses.")
772