1
2
3
4
5
6 import os
7 import time
8 import shutil
9 import logging
10 import threading
11 import Queue
12
13 from lib.cuckoo.common.config import Config, emit_options
14 from lib.cuckoo.common.constants import CUCKOO_ROOT
15 from lib.cuckoo.common.exceptions import CuckooMachineError, CuckooGuestError
16 from lib.cuckoo.common.exceptions import CuckooOperationalError
17 from lib.cuckoo.common.exceptions import CuckooCriticalError
18 from lib.cuckoo.common.objects import File
19 from lib.cuckoo.common.utils import create_folder
20 from lib.cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED
21 from lib.cuckoo.core.guest import GuestManager
22 from lib.cuckoo.core.log import task_log_start, task_log_stop
23 from lib.cuckoo.core.plugins import list_plugins, RunAuxiliary, RunProcessing
24 from lib.cuckoo.core.plugins import RunSignatures, RunReporting
25 from lib.cuckoo.core.resultserver import ResultServer
26 from lib.cuckoo.core.rooter import rooter, vpns
27
28 log = logging.getLogger(__name__)
29
30 machinery = None
31 machine_lock = None
32 latest_symlink_lock = threading.Lock()
33
34 active_analysis_count = 0
35
37 """Analysis Manager.
38
39 This class handles the full analysis process for a given task. It takes
40 care of selecting the analysis machine, preparing the configuration and
41 interacting with the guest agent and analyzer components to launch and
42 complete the analysis and store, process and report its results.
43 """
44
45 - def __init__(self, task_id, error_queue):
46 """@param task: task object containing the details for the analysis."""
47 threading.Thread.__init__(self)
48
49 self.errors = error_queue
50 self.cfg = Config()
51 self.storage = ""
52 self.binary = ""
53 self.storage_binary = ""
54 self.machine = None
55
56 self.db = Database()
57 self.task = self.db.view_task(task_id)
58 self.guest_manager = None
59
60 self.interface = None
61 self.rt_table = None
62
64 """Initialize analysis storage folder."""
65 self.storage = os.path.join(CUCKOO_ROOT,
66 "storage",
67 "analyses",
68 str(self.task.id))
69
70
71
72 if os.path.exists(self.storage):
73 log.error("Analysis results folder already exists at path \"%s\","
74 " analysis aborted", self.storage)
75 return False
76
77
78
79 try:
80 create_folder(folder=self.storage)
81 except CuckooOperationalError:
82 log.error("Unable to create analysis folder %s", self.storage)
83 return False
84
85 return True
86
88 """Checks if we have permissions to access the file to be analyzed."""
89 if os.access(self.task.target, os.R_OK):
90 return True
91
92 log.error(
93 "Unable to access target file, please check if we have "
94 "permissions to access the file: \"%s\"",
95 self.task.target
96 )
97 return False
98
109
111 """Store a copy of the file being analyzed."""
112 if not os.path.exists(self.task.target):
113 log.error("The file to analyze does not exist at path \"%s\", "
114 "analysis aborted", self.task.target)
115 return False
116
117 sha256 = File(self.task.target).get_sha256()
118 self.binary = os.path.join(CUCKOO_ROOT, "storage", "binaries", sha256)
119
120 if os.path.exists(self.binary):
121 log.info("File already exists at \"%s\"", self.binary)
122 else:
123
124
125 try:
126 shutil.copy(self.task.target, self.binary)
127 except (IOError, shutil.Error) as e:
128 log.error("Unable to store file from \"%s\" to \"%s\", "
129 "analysis aborted", self.task.target, self.binary)
130 return False
131
132 try:
133 self.storage_binary = os.path.join(self.storage, "binary")
134
135 if hasattr(os, "symlink"):
136 os.symlink(self.binary, self.storage_binary)
137 else:
138 shutil.copy(self.binary, self.storage_binary)
139 except (AttributeError, OSError) as e:
140 log.error("Unable to create symlink/copy from \"%s\" to "
141 "\"%s\": %s", self.binary, self.storage, e)
142
143 return True
144
146 """grab latest task from db (if available) and update self.task"""
147 dbtask = self.db.view_task(self.task.id)
148 self.task = dbtask.to_dict()
149
150 task_info_path = os.path.join(self.storage, "task.json")
151 open(task_info_path, "w").write(dbtask.to_json())
152
187
226
228 """Enable network routing if desired."""
229
230 route = self.task.options.get("route", self.cfg.routing.route)
231
232 if route == "none":
233 self.interface = None
234 self.rt_table = None
235 elif route == "internet" and self.cfg.routing.internet != "none":
236 self.interface = self.cfg.routing.internet
237 self.rt_table = self.cfg.routing.rt_table
238 elif route in vpns:
239 self.interface = vpns[route].interface
240 self.rt_table = vpns[route].rt_table
241 else:
242 log.warning("Unknown network routing destination specified, "
243 "ignoring routing for this analysis: %r", route)
244 self.interface = None
245 self.rt_table = None
246
247
248
249 if self.interface and not rooter("nic_available", self.interface):
250 log.error(
251 "The network interface '%s' configured for this analysis is "
252 "not available at the moment, switching to route=none mode.",
253 self.interface
254 )
255 route = "none"
256 self.task.options["route"] = "none"
257 self.interface = None
258 self.rt_table = None
259
260 if self.interface:
261 rooter("forward_enable", self.machine.interface,
262 self.interface, self.machine.ip)
263
264 if self.rt_table:
265 rooter("srcroute_enable", self.rt_table, self.machine.ip)
266
267
268 self.db.set_route(self.task.id, route)
269
277
279 """Some VMs don't have an actual agent. Mainly those that are used as
280 assistance for an analysis through the services auxiliary module. This
281 method just waits until the analysis is finished rather than actively
282 trying to engage with the Cuckoo Agent."""
283 self.db.guest_set_status(self.task.id, "running")
284 while self.db.guest_get_status(self.task.id) == "running":
285 time.sleep(1)
286
311
313 """Start analysis."""
314 succeeded = False
315
316 target = self.task.target
317 if self.task.category == "file":
318 target = os.path.basename(target)
319
320 log.info("Starting analysis of %s \"%s\" (task #%d, options \"%s\")",
321 self.task.category.upper(), target, self.task.id,
322 emit_options(self.task.options))
323
324
325 if not self.init_storage():
326 return False
327
328
329 task_log_start(self.task.id)
330
331 self.store_task_info()
332
333 if self.task.category == "file":
334
335
336 if not self.check_permissions():
337 return False
338
339
340
341 if not self.check_file():
342 return False
343
344
345 if not self.store_file():
346 return False
347
348
349 try:
350 self.acquire_machine()
351 except CuckooOperationalError as e:
352 machine_lock.release()
353 log.error("Cannot acquire machine: {0}".format(e))
354 return False
355
356
357 try:
358 ResultServer().add_task(self.task, self.machine)
359 except Exception as e:
360 machinery.release(self.machine.label)
361 self.errors.put(e)
362
363
364 self.guest_manager = GuestManager(
365 self.machine.name, self.machine.ip,
366 self.machine.platform, self.task.id, self
367 )
368
369 self.aux = RunAuxiliary(self.task, self.machine, self.guest_manager)
370 self.aux.start()
371
372
373 options = self.build_options()
374
375 try:
376 unlocked = False
377 self.interface = None
378
379
380 guest_log = self.db.guest_start(self.task.id,
381 self.machine.name,
382 self.machine.label,
383 machinery.__class__.__name__)
384
385 machinery.start(self.machine.label, self.task)
386
387
388 self.route_network()
389
390
391
392 machine_lock.release()
393 unlocked = True
394
395
396
397
398 if "noagent" not in self.machine.options:
399 self.guest_manage(options)
400 else:
401 self.wait_finish()
402
403 succeeded = True
404 except CuckooMachineError as e:
405 if not unlocked:
406 machine_lock.release()
407 log.error(
408 "Machinery error: %s",
409 e, extra={"task_id": self.task.id}
410 )
411 log.critical(
412 "A critical error has occurred trying to use the machine "
413 "with name %s during an analysis due to which it is no "
414 "longer in a working state, please report this issue and all "
415 "of the related environment details to the developers so we "
416 "can improve this situation. (Note that before we would "
417 "simply remove this VM from doing any more analyses, but as "
418 "all the VMs will eventually be depleted that way, hopefully "
419 "we'll find a better solution now).", self.machine.name,
420 )
421 except CuckooGuestError as e:
422 if not unlocked:
423 machine_lock.release()
424 log.error(
425 "Error from the Cuckoo Guest: %s",
426 e, extra={"task_id": self.task.id}
427 )
428 finally:
429
430 self.aux.stop()
431
432
433 if self.cfg.cuckoo.memory_dump or self.task.memory:
434 try:
435 dump_path = os.path.join(self.storage, "memory.dmp")
436 machinery.dump_memory(self.machine.label, dump_path)
437 except NotImplementedError:
438 log.error("The memory dump functionality is not available "
439 "for the current machine manager.")
440 except CuckooMachineError as e:
441 log.error("Machinery error: %s", e)
442
443 try:
444
445 machinery.stop(self.machine.label)
446 except CuckooMachineError as e:
447 log.warning("Unable to stop machine %s: %s",
448 self.machine.label, e)
449
450
451
452
453 self.db.guest_stop(guest_log)
454
455
456
457 ResultServer().del_task(self.task, self.machine)
458
459
460 self.unroute_network()
461
462 try:
463
464
465 machinery.release(self.machine.label)
466 except CuckooMachineError as e:
467 log.error("Unable to release machine %s, reason %s. "
468 "You might need to restore it manually.",
469 self.machine.label, e)
470
471 return succeeded
472
474 """Process the analysis results and generate the enabled reports."""
475 results = RunProcessing(task=self.task).run()
476 RunSignatures(results=results).run()
477 RunReporting(task=self.task, results=results).run()
478
479
480
481 if self.task.category == "file" and self.cfg.cuckoo.delete_original:
482 if not os.path.exists(self.task.target):
483 log.warning("Original file does not exist anymore: \"%s\": "
484 "File not found.", self.task.target)
485 else:
486 try:
487 os.remove(self.task.target)
488 except OSError as e:
489 log.error("Unable to delete original file at path "
490 "\"%s\": %s", self.task.target, e)
491
492
493
494 if self.task.category == "file" and self.cfg.cuckoo.delete_bin_copy:
495 if not os.path.exists(self.binary):
496 log.warning("Copy of the original file does not exist anymore: \"%s\": File not found", self.binary)
497 else:
498 try:
499 os.remove(self.binary)
500 except OSError as e:
501 log.error("Unable to delete the copy of the original file at path \"%s\": %s", self.binary, e)
502
503 if os.path.islink(self.storage_binary) and not os.path.exists(self.storage_binary):
504 try:
505 os.remove(self.storage_binary)
506 except OSError as e:
507 log.error("Unable to delete symlink to the binary copy at path \"%s\": %s", self.storage_binary, e)
508
509 log.info("Task #%d: reports generation completed (path=%s)",
510 self.task.id, self.storage)
511
512 return True
513
563
565 """Tasks Scheduler.
566
567 This class is responsible for the main execution loop of the tool. It
568 prepares the analysis machines and keep waiting and loading for new
569 analysis tasks.
570 Whenever a new task is available, it launches AnalysisManager which will
571 take care of running the full analysis process and operating with the
572 assigned analysis machine.
573 """
575 self.running = True
576 self.cfg = Config()
577 self.db = Database()
578 self.maxcount = maxcount
579 self.total_analysis_count = 0
580
582 """Initialize the machine manager."""
583 global machinery, machine_lock
584
585 machinery_name = self.cfg.cuckoo.machinery
586
587 max_vmstartup_count = self.cfg.cuckoo.max_vmstartup_count
588 if max_vmstartup_count:
589 machine_lock = threading.Semaphore(max_vmstartup_count)
590 else:
591 machine_lock = threading.Lock()
592
593 log.info("Using \"%s\" as machine manager", machinery_name)
594
595
596
597 plugin = list_plugins("machinery")[0]
598
599 machinery = plugin()
600
601
602 conf = os.path.join(CUCKOO_ROOT, "conf", "%s.conf" % machinery_name)
603
604 if not os.path.exists(conf):
605 raise CuckooCriticalError("The configuration file for machine "
606 "manager \"{0}\" does not exist at path:"
607 " {1}".format(machinery_name, conf))
608
609
610
611 machinery.set_options(Config(machinery_name))
612
613
614 try:
615 machinery.initialize(machinery_name)
616 except CuckooMachineError as e:
617 raise CuckooCriticalError("Error initializing machines: %s" % e)
618
619
620
621
622 if not len(machinery.machines()):
623 raise CuckooCriticalError("No machines available.")
624 else:
625 log.info("Loaded %s machine/s", len(machinery.machines()))
626
627 if len(machinery.machines()) > 1 and self.db.engine.name == "sqlite":
628 log.warning("As you've configured Cuckoo to execute parallel "
629 "analyses, we recommend you to switch to a MySQL or"
630 "a PostgreSQL database as SQLite might cause some "
631 "issues.")
632
633 if len(machinery.machines()) > 4 and self.cfg.cuckoo.process_results:
634 log.warning("When running many virtual machines it is recommended "
635 "to process the results in a separate process.py to "
636 "increase throughput and stability. Please read the "
637 "documentation about the `Processing Utility`.")
638
639
640
641
642 for machine in machinery.machines():
643 if not machine.interface:
644 log.info("Unable to determine the network interface for VM "
645 "with name %s, Cuckoo will not be able to give it "
646 "full internet access or route it through a VPN! "
647 "Please define a default network interface for the "
648 "machinery or define a network interface for each "
649 "VM.", machine.name)
650 continue
651
652
653 for vpn in vpns.values():
654 rooter("forward_disable", machine.interface,
655 vpn.interface, machine.ip)
656
657
658 if self.cfg.routing.internet != "none":
659 rooter("forward_disable", machine.interface,
660 self.cfg.routing.internet, machine.ip)
661
667
669 """Start scheduler."""
670 self.initialize()
671
672 log.info("Waiting for analysis tasks.")
673
674
675 errors = Queue.Queue()
676
677
678 if self.maxcount is None:
679 self.maxcount = self.cfg.cuckoo.max_analysis_count
680
681
682 while self.running:
683 time.sleep(1)
684
685
686
687
688
689
690 if not machine_lock.acquire(False):
691 continue
692
693 machine_lock.release()
694
695
696
697
698 if self.cfg.cuckoo.freespace:
699
700
701 dir_path = os.path.join(CUCKOO_ROOT, "storage", "analyses")
702
703
704 if hasattr(os, "statvfs"):
705 dir_stats = os.statvfs(dir_path)
706
707
708 space_available = dir_stats.f_bavail * dir_stats.f_frsize
709 space_available /= 1024 * 1024
710
711 if space_available < self.cfg.cuckoo.freespace:
712 log.error("Not enough free disk space! (Only %d MB!)",
713 space_available)
714 continue
715
716
717 if self.cfg.cuckoo.max_machines_count:
718
719 if len(machinery.running()) >= self.cfg.cuckoo.max_machines_count:
720 continue
721
722
723
724 if not machinery.availables():
725 continue
726
727
728
729 if self.maxcount and self.total_analysis_count >= self.maxcount:
730 if active_analysis_count <= 0:
731 log.debug("Reached max analysis count, exiting.")
732 self.stop()
733 continue
734
735
736
737
738
739
740
741 task, available = None, False
742 for machine in self.db.get_available_machines():
743 task = self.db.fetch(machine=machine.name)
744 if task:
745 break
746
747 if machine.is_analysis():
748 available = True
749
750
751
752
753 if not task and available:
754 task = self.db.fetch(service=False)
755
756 if task:
757 log.debug("Processing task #%s", task.id)
758 self.total_analysis_count += 1
759
760
761 analysis = AnalysisManager(task.id, errors)
762 analysis.daemon = True
763 analysis.start()
764
765
766 try:
767 raise errors.get(block=False)
768 except Queue.Empty:
769 pass
770
771 log.debug("End of analyses.")
772