Package lib :: Package cuckoo :: Package core :: Module database
[hide private]
[frames] | no frames]

Source Code for Module lib.cuckoo.core.database

   1  # Copyright (C) 2010-2014 Cuckoo Foundation. 
   2  # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org 
   3  # See the file 'docs/LICENSE' for copying permission. 
   4   
   5  import os 
   6  import json 
   7  import logging 
   8  from datetime import datetime 
   9   
  10  from lib.cuckoo.common.config import Config 
  11  from lib.cuckoo.common.constants import CUCKOO_ROOT 
  12  from lib.cuckoo.common.exceptions import CuckooDatabaseError 
  13  from lib.cuckoo.common.exceptions import CuckooOperationalError 
  14  from lib.cuckoo.common.exceptions import CuckooDependencyError 
  15  from lib.cuckoo.common.objects import File, URL 
  16  from lib.cuckoo.common.utils import create_folder, Singleton 
  17   
  18  try: 
  19      from sqlalchemy import create_engine, Column 
  20      from sqlalchemy import Integer, String, Boolean, DateTime, Enum 
  21      from sqlalchemy import ForeignKey, Text, Index, Table 
  22      from sqlalchemy.ext.declarative import declarative_base 
  23      from sqlalchemy.exc import SQLAlchemyError, IntegrityError 
  24      from sqlalchemy.orm import sessionmaker, relationship, joinedload, backref 
  25      from sqlalchemy.pool import NullPool 
  26      Base = declarative_base() 
  27  except ImportError: 
  28      raise CuckooDependencyError("Unable to import sqlalchemy " 
  29                                  "(install with `pip install sqlalchemy`)") 
  30   
  31  log = logging.getLogger(__name__) 
  32   
  33  SCHEMA_VERSION = "263a45963c72" 
  34  TASK_PENDING = "pending" 
  35  TASK_RUNNING = "running" 
  36  TASK_COMPLETED = "completed" 
  37  TASK_RECOVERED = "recovered" 
  38  TASK_REPORTED = "reported" 
  39  TASK_FAILED_ANALYSIS = "failed_analysis" 
  40  TASK_FAILED_PROCESSING = "failed_processing" 
  41   
  42  # Secondary table used in association Machine - Tag. 
  43  machines_tags = Table("machines_tags", Base.metadata, 
  44      Column("machine_id", Integer, ForeignKey("machines.id")), 
  45      Column("tag_id", Integer, ForeignKey("tags.id")) 
  46  ) 
  47   
  48  # Secondary table used in association Task - Tag. 
  49  tasks_tags = Table("tasks_tags", Base.metadata, 
  50      Column("task_id", Integer, ForeignKey("tasks.id")), 
  51      Column("tag_id", Integer, ForeignKey("tags.id")) 
  52  ) 
  53   
54 -class Machine(Base):
55 """Configured virtual machines to be used as guests.""" 56 __tablename__ = "machines" 57 58 id = Column(Integer(), primary_key=True) 59 name = Column(String(255), nullable=False) 60 label = Column(String(255), nullable=False) 61 ip = Column(String(255), nullable=False) 62 platform = Column(String(255), nullable=False) 63 tags = relationship("Tag", secondary=machines_tags, cascade="all, delete", 64 single_parent=True, backref=backref("machine", cascade="all")) 65 interface = Column(String(255), nullable=True) 66 snapshot = Column(String(255), nullable=True) 67 locked = Column(Boolean(), nullable=False, default=False) 68 locked_changed_on = Column(DateTime(timezone=False), nullable=True) 69 status = Column(String(255), nullable=True) 70 status_changed_on = Column(DateTime(timezone=False), nullable=True) 71 resultserver_ip = Column(String(255), nullable=False) 72 resultserver_port = Column(String(255), nullable=False) 73
74 - def __repr__(self):
75 return "<Machine('{0}','{1}')>".format(self.id, self.name)
76
77 - def to_dict(self):
78 """Converts object to dict. 79 @return: dict 80 """ 81 d = {} 82 for column in self.__table__.columns: 83 value = getattr(self, column.name) 84 if isinstance(value, datetime): 85 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S") 86 else: 87 d[column.name] = value 88 89 # Tags are a relation so no column to iterate. 90 d["tags"] = [tag.name for tag in self.tags] 91 92 return d
93
94 - def to_json(self):
95 """Converts object to JSON. 96 @return: JSON data 97 """ 98 return json.dumps(self.to_dict())
99
100 - def __init__(self, 101 name, 102 label, 103 ip, 104 platform, 105 interface, 106 snapshot, 107 resultserver_ip, 108 resultserver_port):
109 self.name = name 110 self.label = label 111 self.ip = ip 112 self.platform = platform 113 self.interface = interface 114 self.snapshot = snapshot 115 self.resultserver_ip = resultserver_ip 116 self.resultserver_port = resultserver_port
117
118 -class Tag(Base):
119 """Tag describing anything you want.""" 120 __tablename__ = "tags" 121 122 id = Column(Integer(), primary_key=True) 123 name = Column(String(255), nullable=False, unique=True) 124
125 - def __repr__(self):
126 return "<Tag('{0}','{1}')>".format(self.id, self.name)
127
128 - def __init__(self, 129 name):
130 self.name = name
131
132 -class Guest(Base):
133 """Tracks guest run.""" 134 __tablename__ = "guests" 135 136 id = Column(Integer(), primary_key=True) 137 name = Column(String(255), nullable=False) 138 label = Column(String(255), nullable=False) 139 manager = Column(String(255), nullable=False) 140 started_on = Column(DateTime(timezone=False), 141 default=datetime.now, 142 nullable=False) 143 shutdown_on = Column(DateTime(timezone=False), nullable=True) 144 task_id = Column(Integer, 145 ForeignKey("tasks.id"), 146 nullable=False, 147 unique=True) 148
149 - def __repr__(self):
150 return "<Guest('{0}','{1}')>".format(self.id, self.name)
151
152 - def to_dict(self):
153 """Converts object to dict. 154 @return: dict 155 """ 156 d = {} 157 for column in self.__table__.columns: 158 value = getattr(self, column.name) 159 if isinstance(value, datetime): 160 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S") 161 else: 162 d[column.name] = value 163 return d
164
165 - def to_json(self):
166 """Converts object to JSON. 167 @return: JSON data 168 """ 169 return json.dumps(self.to_dict())
170
171 - def __init__(self, name, label, manager):
172 self.name = name 173 self.label = label 174 self.manager = manager
175
176 -class Sample(Base):
177 """Submitted files details.""" 178 __tablename__ = "samples" 179 180 id = Column(Integer(), primary_key=True) 181 file_size = Column(Integer(), nullable=False) 182 file_type = Column(String(255), nullable=False) 183 md5 = Column(String(32), nullable=False) 184 crc32 = Column(String(8), nullable=False) 185 sha1 = Column(String(40), nullable=False) 186 sha256 = Column(String(64), nullable=False) 187 sha512 = Column(String(128), nullable=False) 188 ssdeep = Column(String(255), nullable=True) 189 __table_args__ = (Index("hash_index", 190 "md5", 191 "crc32", 192 "sha1", 193 "sha256", 194 "sha512", 195 unique=True), ) 196
197 - def __repr__(self):
198 return "<Sample('{0}','{1}')>".format(self.id, self.sha256)
199
200 - def to_dict(self):
201 """Converts object to dict. 202 @return: dict 203 """ 204 d = {} 205 for column in self.__table__.columns: 206 value = getattr(self, column.name) 207 d[column.name] = value 208 return d
209
210 - def to_json(self):
211 """Converts object to JSON. 212 @return: JSON data 213 """ 214 return json.dumps(self.to_dict())
215
216 - def __init__(self, 217 md5, 218 crc32, 219 sha1, 220 sha256, 221 sha512, 222 file_size, 223 file_type=None, 224 ssdeep=None):
225 self.md5 = md5 226 self.sha1 = sha1 227 self.crc32 = crc32 228 self.sha256 = sha256 229 self.sha512 = sha512 230 self.file_size = file_size 231 if file_type: 232 self.file_type = file_type 233 if ssdeep: 234 self.ssdeep = ssdeep
235
236 -class Error(Base):
237 """Analysis errors.""" 238 __tablename__ = "errors" 239 240 id = Column(Integer(), primary_key=True) 241 message = Column(String(255), nullable=False) 242 task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False) 243
244 - def to_dict(self):
245 """Converts object to dict. 246 @return: dict 247 """ 248 d = {} 249 for column in self.__table__.columns: 250 value = getattr(self, column.name) 251 d[column.name] = value 252 return d
253
254 - def to_json(self):
255 """Converts object to JSON. 256 @return: JSON data 257 """ 258 return json.dumps(self.to_dict())
259
260 - def __init__(self, message, task_id):
261 self.message = message 262 self.task_id = task_id
263
264 - def __repr__(self):
265 return "<Error('{0}','{1}','{2}')>".format(self.id, self.message, self.task_id)
266
267 -class Task(Base):
268 """Analysis task queue.""" 269 __tablename__ = "tasks" 270 271 id = Column(Integer(), primary_key=True) 272 target = Column(Text(), nullable=False) 273 category = Column(String(255), nullable=False) 274 timeout = Column(Integer(), server_default="0", nullable=False) 275 priority = Column(Integer(), server_default="1", nullable=False) 276 custom = Column(String(255), nullable=True) 277 machine = Column(String(255), nullable=True) 278 package = Column(String(255), nullable=True) 279 tags = relationship("Tag", secondary=tasks_tags, cascade="all, delete", 280 single_parent=True, backref=backref("task", cascade="all"), 281 lazy="subquery") 282 options = Column(String(255), nullable=True) 283 platform = Column(String(255), nullable=True) 284 memory = Column(Boolean, nullable=False, default=False) 285 enforce_timeout = Column(Boolean, nullable=False, default=False) 286 clock = Column(DateTime(timezone=False), 287 default=datetime.now, 288 nullable=False) 289 added_on = Column(DateTime(timezone=False), 290 default=datetime.now, 291 nullable=False) 292 started_on = Column(DateTime(timezone=False), nullable=True) 293 completed_on = Column(DateTime(timezone=False), nullable=True) 294 status = Column(Enum(TASK_PENDING, 295 TASK_RUNNING, 296 TASK_COMPLETED, 297 TASK_REPORTED, 298 TASK_RECOVERED, 299 name="status_type"), 300 server_default=TASK_PENDING, 301 nullable=False) 302 sample_id = Column(Integer, ForeignKey("samples.id"), nullable=True) 303 sample = relationship("Sample", backref="tasks") 304 guest = relationship("Guest", uselist=False, backref="tasks", cascade="save-update, delete") 305 errors = relationship("Error", backref="tasks", cascade="save-update, delete") 306
307 - def to_dict(self):
308 """Converts object to dict. 309 @return: dict 310 """ 311 d = {} 312 for column in self.__table__.columns: 313 value = getattr(self, column.name) 314 if isinstance(value, datetime): 315 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S") 316 else: 317 d[column.name] = value 318 319 # Tags are a relation so no column to iterate. 320 d["tags"] = [tag.name for tag in self.tags] 321 322 return d
323
324 - def to_json(self):
325 """Converts object to JSON. 326 @return: JSON data 327 """ 328 return json.dumps(self.to_dict())
329
330 - def __init__(self, target=None):
331 self.target = target
332
333 - def __repr__(self):
334 return "<Task('{0}','{1}')>".format(self.id, self.target)
335
336 -class AlembicVersion(Base):
337 """Table used to pinpoint actual database schema release.""" 338 __tablename__ = "alembic_version" 339 340 version_num = Column(String(32), nullable=False, primary_key=True)
341
342 -class Database(object):
343 """Analysis queue database. 344 345 This class handles the creation of the database user for internal queue 346 management. It also provides some functions for interacting with it. 347 """ 348 __metaclass__ = Singleton 349
350 - def __init__(self, dsn=None):
351 """@param dsn: database connection string.""" 352 cfg = Config() 353 354 if dsn: 355 self.engine = create_engine(dsn, poolclass=NullPool) 356 elif cfg.database.connection: 357 self.engine = create_engine(cfg.database.connection, poolclass=NullPool) 358 else: 359 db_file = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db") 360 if not os.path.exists(db_file): 361 db_dir = os.path.dirname(db_file) 362 if not os.path.exists(db_dir): 363 try: 364 create_folder(folder=db_dir) 365 except CuckooOperationalError as e: 366 raise CuckooDatabaseError("Unable to create database directory: {0}".format(e)) 367 368 self.engine = create_engine("sqlite:///{0}".format(db_file), poolclass=NullPool) 369 370 # Disable SQL logging. Turn it on for debugging. 371 self.engine.echo = False 372 # Connection timeout. 373 if cfg.database.timeout: 374 self.engine.pool_timeout = cfg.database.timeout 375 else: 376 self.engine.pool_timeout = 60 377 # Create schema. 378 try: 379 Base.metadata.create_all(self.engine) 380 except SQLAlchemyError as e: 381 raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e)) 382 383 # Get db session. 384 self.Session = sessionmaker(bind=self.engine) 385 386 # Set database schema version. 387 # TODO: it's a little bit dirty, needs refactoring. 388 tmp_session = self.Session() 389 if tmp_session.query(AlembicVersion).count() == 0: 390 tmp_session.add(AlembicVersion(version_num=SCHEMA_VERSION)) 391 try: 392 tmp_session.commit() 393 except SQLAlchemyError as e: 394 raise CuckooDatabaseError("Unable to set schema version: {0}".format(e)) 395 tmp_session.rollback() 396 finally: 397 tmp_session.close() 398 else: 399 tmp_session.close()
400
401 - def __del__(self):
402 """Disconnects pool.""" 403 self.engine.dispose()
404
405 - def _get_or_create(self, session, model, **kwargs):
406 """Get an ORM instance or create it if not exist. 407 @param session: SQLAlchemy session object 408 @param model: model to query 409 @return: row instance 410 """ 411 instance = session.query(model).filter_by(**kwargs).first() 412 if instance: 413 return instance 414 else: 415 instance = model(**kwargs) 416 return instance
417
418 - def clean_machines(self):
419 """Clean old stored machines and related tables.""" 420 # Secondary table. 421 # TODO: this is better done via cascade delete. 422 self.engine.execute(machines_tags.delete()) 423 424 session = self.Session() 425 try: 426 session.query(Machine).delete() 427 session.commit() 428 except SQLAlchemyError as e: 429 log.debug("Database error cleaning machines: {0}".format(e)) 430 session.rollback() 431 finally: 432 session.close()
433
434 - def add_machine(self, 435 name, 436 label, 437 ip, 438 platform, 439 tags, 440 interface, 441 snapshot, 442 resultserver_ip, 443 resultserver_port):
444 """Add a guest machine. 445 @param name: machine id 446 @param label: machine label 447 @param ip: machine IP address 448 @param platform: machine supported platform 449 @param interface: sniffing interface for this machine 450 @param snapshot: snapshot name to use instead of the current one, if configured 451 @param resultserver_ip: IP address of the Result Server 452 @param resultserver_port: port of the Result Server 453 """ 454 session = self.Session() 455 machine = Machine(name=name, 456 label=label, 457 ip=ip, 458 platform=platform, 459 interface=interface, 460 snapshot=snapshot, 461 resultserver_ip=resultserver_ip, 462 resultserver_port=resultserver_port) 463 # Deal with tags format (i.e. foo,bar,baz) 464 if tags: 465 for tag in tags.replace(" ", "").split(","): 466 machine.tags.append(self._get_or_create(session, Tag, name=tag)) 467 session.add(machine) 468 469 try: 470 session.commit() 471 except SQLAlchemyError as e: 472 log.debug("Database error adding machine: {0}".format(e)) 473 session.rollback() 474 finally: 475 session.close()
476
477 - def set_status(self, task_id, status):
478 """Set task status. 479 @param task_id: task identifier 480 @param status: status string 481 @return: operation status 482 """ 483 session = self.Session() 484 try: 485 row = session.query(Task).get(task_id) 486 row.status = status 487 488 if status == TASK_RUNNING: 489 row.started_on = datetime.now() 490 elif status == TASK_COMPLETED: 491 row.completed_on = datetime.now() 492 493 session.commit() 494 except SQLAlchemyError as e: 495 log.debug("Database error setting status: {0}".format(e)) 496 session.rollback() 497 finally: 498 session.close()
499
500 - def fetch(self, lock=True):
501 """Fetches a task waiting to be processed and locks it for running. 502 @return: None or task 503 """ 504 session = self.Session() 505 row = None 506 507 try: 508 row = session.query(Task).filter(Task.status == TASK_PENDING).order_by("priority desc, added_on").first() 509 510 if not row: 511 return None 512 513 if lock: 514 self.set_status(task_id=row.id, status=TASK_RUNNING) 515 session.refresh(row) 516 except SQLAlchemyError as e: 517 log.debug("Database error fetching task: {0}".format(e)) 518 session.rollback() 519 finally: 520 session.close() 521 522 return row
523
524 - def guest_start(self, task_id, name, label, manager):
525 """Logs guest start. 526 @param task_id: task identifier 527 @param name: vm name 528 @param label: vm label 529 @param manager: vm manager 530 @return: guest row id 531 """ 532 session = self.Session() 533 guest = Guest(name, label, manager) 534 try: 535 session.query(Task).get(task_id).guest = guest 536 session.commit() 537 session.refresh(guest) 538 except SQLAlchemyError as e: 539 log.debug("Database error logging guest start: {0}".format(e)) 540 session.rollback() 541 return None 542 finally: 543 session.close() 544 return guest.id
545
546 - def guest_remove(self, guest_id):
547 """Removes a guest start entry.""" 548 session = self.Session() 549 try: 550 guest = session.query(Guest).get(guest_id) 551 session.delete(guest) 552 session.commit() 553 except SQLAlchemyError as e: 554 log.debug("Database error logging guest remove: {0}".format(e)) 555 session.rollback() 556 return None 557 finally: 558 session.close()
559
560 - def guest_stop(self, guest_id):
561 """Logs guest stop. 562 @param guest_id: guest log entry id 563 """ 564 session = self.Session() 565 try: 566 session.query(Guest).get(guest_id).shutdown_on = datetime.now() 567 session.commit() 568 except SQLAlchemyError as e: 569 log.debug("Database error logging guest stop: {0}".format(e)) 570 session.rollback() 571 finally: 572 session.close()
573
574 - def list_machines(self, locked=False):
575 """Lists virtual machines. 576 @return: list of virtual machines 577 """ 578 session = self.Session() 579 try: 580 if locked: 581 machines = session.query(Machine).options(joinedload("tags")).filter(Machine.locked == True).all() 582 else: 583 machines = session.query(Machine).options(joinedload("tags")).all() 584 except SQLAlchemyError as e: 585 log.debug("Database error listing machines: {0}".format(e)) 586 return None 587 finally: 588 session.close() 589 return machines
590
591 - def lock_machine(self, name=None, platform=None, tags=None):
592 """Places a lock on a free virtual machine. 593 @param name: optional virtual machine name 594 @param platform: optional virtual machine platform 595 @param tags: optional tags required (list) 596 @return: locked machine 597 """ 598 session = self.Session() 599 600 # Preventive checks. 601 if name and platform: 602 # Wrong usage. 603 log.error("You can select machine only by name or by platform.") 604 return None 605 elif name and tags: 606 # Also wrong usage 607 log.error("You can select machine only by name or by tags.") 608 return None 609 610 try: 611 machines = session.query(Machine) 612 if name: 613 machines = machines.filter(Machine.name == name) 614 if platform: 615 machines = machines.filter(Machine.platform == platform) 616 if tags: 617 for tag in tags: 618 machines = machines.filter(Machine.tags.any(name=tag.name)) 619 620 # Check if there are any machines that satisfy the 621 # selection requirements. 622 if machines.count() == 0: 623 raise CuckooOperationalError("No machines match selection criteria") 624 625 # Get only free machines. 626 machines = machines.filter(Machine.locked == False) 627 # Get only one. 628 machine = machines.first() 629 except SQLAlchemyError as e: 630 log.debug("Database error locking machine: {0}".format(e)) 631 session.close() 632 return None 633 634 if machine: 635 machine.locked = True 636 machine.locked_changed_on = datetime.now() 637 try: 638 session.commit() 639 session.refresh(machine) 640 except SQLAlchemyError as e: 641 log.debug("Database error locking machine: {0}".format(e)) 642 session.rollback() 643 return None 644 finally: 645 session.close() 646 647 return machine
648
649 - def unlock_machine(self, label):
650 """Remove lock form a virtual machine. 651 @param label: virtual machine label 652 @return: unlocked machine 653 """ 654 session = self.Session() 655 try: 656 machine = session.query(Machine).filter(Machine.label == label).first() 657 except SQLAlchemyError as e: 658 log.debug("Database error unlocking machine: {0}".format(e)) 659 session.close() 660 return None 661 662 if machine: 663 machine.locked = False 664 machine.locked_changed_on = datetime.now() 665 try: 666 session.commit() 667 session.refresh(machine) 668 except SQLAlchemyError as e: 669 log.debug("Database error locking machine: {0}".format(e)) 670 session.rollback() 671 return None 672 finally: 673 session.close() 674 675 return machine
676
677 - def count_machines_available(self):
678 """How many virtual machines are ready for analysis. 679 @return: free virtual machines count 680 """ 681 session = self.Session() 682 try: 683 machines_count = session.query(Machine).filter(Machine.locked == False).count() 684 except SQLAlchemyError as e: 685 log.debug("Database error counting machines: {0}".format(e)) 686 return 0 687 finally: 688 session.close() 689 return machines_count
690
691 - def set_machine_status(self, label, status):
692 """Set status for a virtual machine. 693 @param label: virtual machine label 694 @param status: new virtual machine status 695 """ 696 session = self.Session() 697 try: 698 machine = session.query(Machine).filter(Machine.label == label).first() 699 except SQLAlchemyError as e: 700 log.debug("Database error setting machine status: {0}".format(e)) 701 session.close() 702 return 703 704 if machine: 705 machine.status = status 706 machine.status_changed_on = datetime.now() 707 try: 708 session.commit() 709 session.refresh(machine) 710 except SQLAlchemyError as e: 711 log.debug("Database error setting machine status: {0}".format(e)) 712 session.rollback() 713 finally: 714 session.close() 715 else: 716 session.close()
717
718 - def add_error(self, message, task_id):
719 """Add an error related to a task. 720 @param message: error message 721 @param task_id: ID of the related task 722 """ 723 session = self.Session() 724 error = Error(message=message, task_id=task_id) 725 session.add(error) 726 try: 727 session.commit() 728 except SQLAlchemyError as e: 729 log.debug("Database error adding error log: {0}".format(e)) 730 session.rollback() 731 finally: 732 session.close()
733 734 # The following functions are mostly used by external utils. 735
736 - def add(self, 737 obj, 738 timeout=0, 739 package="", 740 options="", 741 priority=1, 742 custom="", 743 machine="", 744 platform="", 745 tags=None, 746 memory=False, 747 enforce_timeout=False, 748 clock=None):
749 """Add a task to database. 750 @param obj: object to add (File or URL). 751 @param timeout: selected timeout. 752 @param options: analysis options. 753 @param priority: analysis priority. 754 @param custom: custom options. 755 @param machine: selected machine. 756 @param platform: platform. 757 @param tags: optional tags that must be set for machine selection 758 @param memory: toggle full memory dump. 759 @param enforce_timeout: toggle full timeout execution. 760 @param clock: virtual machine clock time 761 @return: cursor or None. 762 """ 763 session = self.Session() 764 765 # Convert empty strings and None values to a valid int 766 if not timeout: 767 timeout = 0 768 if not priority: 769 priority = 1 770 771 if isinstance(obj, File): 772 sample = Sample(md5=obj.get_md5(), 773 crc32=obj.get_crc32(), 774 sha1=obj.get_sha1(), 775 sha256=obj.get_sha256(), 776 sha512=obj.get_sha512(), 777 file_size=obj.get_size(), 778 file_type=obj.get_type(), 779 ssdeep=obj.get_ssdeep()) 780 session.add(sample) 781 782 try: 783 session.commit() 784 except IntegrityError: 785 session.rollback() 786 try: 787 sample = session.query(Sample).filter(Sample.md5 == obj.get_md5()).first() 788 except SQLAlchemyError: 789 session.close() 790 return None 791 except SQLAlchemyError as e: 792 log.debug("Database error adding task: {0}".format(e)) 793 session.close() 794 return None 795 796 task = Task(obj.file_path) 797 task.sample_id = sample.id 798 elif isinstance(obj, URL): 799 task = Task(obj.url) 800 801 task.category = obj.__class__.__name__.lower() 802 task.timeout = timeout 803 task.package = package 804 task.options = options 805 task.priority = priority 806 task.custom = custom 807 task.machine = machine 808 task.platform = platform 809 task.memory = memory 810 task.enforce_timeout = enforce_timeout 811 812 # Deal with tags format (i.e. foo,bar,baz) 813 if tags: 814 for tag in tags.replace(" ","").split(","): 815 task.tags.append(self._get_or_create(session, Tag, name=tag)) 816 817 if clock: 818 if isinstance(clock, str) or isinstance(clock, unicode): 819 try: 820 task.clock = datetime.strptime(clock, "%m-%d-%Y %H:%M:%S") 821 except ValueError: 822 log.warning("The date you specified has an invalid format, using current timestamp") 823 task.clock = datetime.now() 824 else: 825 task.clock = clock 826 827 session.add(task) 828 829 try: 830 session.commit() 831 task_id = task.id 832 except SQLAlchemyError as e: 833 log.debug("Database error adding task: {0}".format(e)) 834 session.rollback() 835 return None 836 finally: 837 session.close() 838 839 return task_id
840
841 - def add_path(self, 842 file_path, 843 timeout=0, 844 package="", 845 options="", 846 priority=1, 847 custom="", 848 machine="", 849 platform="", 850 tags=None, 851 memory=False, 852 enforce_timeout=False, 853 clock=None):
854 """Add a task to database from file path. 855 @param file_path: sample path. 856 @param timeout: selected timeout. 857 @param options: analysis options. 858 @param priority: analysis priority. 859 @param custom: custom options. 860 @param machine: selected machine. 861 @param platform: platform. 862 @param tags: Tags required in machine selection 863 @param memory: toggle full memory dump. 864 @param enforce_timeout: toggle full timeout execution. 865 @param clock: virtual machine clock time 866 @return: cursor or None. 867 """ 868 if not file_path or not os.path.exists(file_path): 869 return None 870 871 # Convert empty strings and None values to a valid int 872 if not timeout: 873 timeout = 0 874 if not priority: 875 priority = 1 876 877 return self.add(File(file_path), 878 timeout, 879 package, 880 options, 881 priority, 882 custom, 883 machine, 884 platform, 885 tags, 886 memory, 887 enforce_timeout, 888 clock)
889
890 - def add_url(self, 891 url, 892 timeout=0, 893 package="", 894 options="", 895 priority=1, 896 custom="", 897 machine="", 898 platform="", 899 tags=None, 900 memory=False, 901 enforce_timeout=False, 902 clock=None):
903 """Add a task to database from url. 904 @param url: url. 905 @param timeout: selected timeout. 906 @param options: analysis options. 907 @param priority: analysis priority. 908 @param custom: custom options. 909 @param machine: selected machine. 910 @param platform: platform. 911 @param tags: tags for machine selection 912 @param memory: toggle full memory dump. 913 @param enforce_timeout: toggle full timeout execution. 914 @param clock: virtual machine clock time 915 @return: cursor or None. 916 """ 917 918 # Convert empty strings and None values to a valid int 919 if not timeout: 920 timeout = 0 921 if not priority: 922 priority = 1 923 924 return self.add(URL(url), 925 timeout, 926 package, 927 options, 928 priority, 929 custom, 930 machine, 931 platform, 932 tags, 933 memory, 934 enforce_timeout, 935 clock)
936
937 - def reschedule(self, task_id):
938 """Reschedule a task. 939 @param task_id: ID of the task to reschedule. 940 @return: ID of the newly created task. 941 """ 942 task = self.view_task(task_id) 943 944 if not task: 945 return None 946 947 if task.category == "file": 948 add = self.add_path 949 elif task.category == "url": 950 add = self.add_url 951 952 # Change status to recovered. 953 session = self.Session() 954 session.query(Task).get(task_id).status = TASK_RECOVERED 955 try: 956 session.commit() 957 except SQLAlchemyError as e: 958 log.debug("Database error rescheduling task: {0}".format(e)) 959 session.rollback() 960 return False 961 finally: 962 session.close() 963 964 # Normalize tags. 965 if task.tags: 966 tags = ",".join([tag.name for tag in task.tags]) 967 else: 968 tags = task.tags 969 970 return add(task.target, 971 task.timeout, 972 task.package, 973 task.options, 974 task.priority, 975 task.custom, 976 task.machine, 977 task.platform, 978 tags, 979 task.memory, 980 task.enforce_timeout, 981 task.clock)
982
983 - def list_tasks(self, limit=None, details=False, category=None, offset=None, status=None, not_status=None):
984 """Retrieve list of task. 985 @param limit: specify a limit of entries. 986 @param details: if details about must be included 987 @param category: filter by category 988 @param offset: list offset 989 @param status: filter by task status 990 @param not_status: exclude this task status from filter 991 @return: list of tasks. 992 """ 993 session = self.Session() 994 try: 995 search = session.query(Task) 996 997 if status: 998 search = search.filter(Task.status == status) 999 if not_status: 1000 search = search.filter(Task.status != not_status) 1001 if category: 1002 search = search.filter(Task.category == category) 1003 if details: 1004 search = search.options(joinedload("guest"), joinedload("errors"), joinedload("tags")) 1005 1006 tasks = search.order_by("added_on desc").limit(limit).offset(offset).all() 1007 except SQLAlchemyError as e: 1008 log.debug("Database error listing tasks: {0}".format(e)) 1009 return None 1010 finally: 1011 session.close() 1012 return tasks
1013
1014 - def count_tasks(self, status=None):
1015 """Count tasks in the database 1016 @param status: apply a filter according to the task status 1017 @return: number of tasks found 1018 """ 1019 session = self.Session() 1020 try: 1021 if status: 1022 tasks_count = session.query(Task).filter(Task.status == status).count() 1023 else: 1024 tasks_count = session.query(Task).count() 1025 except SQLAlchemyError as e: 1026 log.debug("Database error counting tasks: {0}".format(e)) 1027 return 0 1028 finally: 1029 session.close() 1030 return tasks_count
1031
1032 - def view_task(self, task_id, details=False):
1033 """Retrieve information on a task. 1034 @param task_id: ID of the task to query. 1035 @return: details on the task. 1036 """ 1037 session = self.Session() 1038 try: 1039 if details: 1040 task = session.query(Task).options(joinedload("guest"), joinedload("errors"), joinedload("tags")).get(task_id) 1041 else: 1042 task = session.query(Task).get(task_id) 1043 except SQLAlchemyError as e: 1044 log.debug("Database error viewing task: {0}".format(e)) 1045 return None 1046 else: 1047 if task: 1048 session.expunge(task) 1049 finally: 1050 session.close() 1051 return task
1052
1053 - def delete_task(self, task_id):
1054 """Delete information on a task. 1055 @param task_id: ID of the task to query. 1056 @return: operation status. 1057 """ 1058 session = self.Session() 1059 try: 1060 task = session.query(Task).get(task_id) 1061 session.delete(task) 1062 session.commit() 1063 except SQLAlchemyError as e: 1064 log.debug("Database error deleting task: {0}".format(e)) 1065 session.rollback() 1066 return False 1067 finally: 1068 session.close() 1069 return True
1070
1071 - def view_sample(self, sample_id):
1072 """Retrieve information on a sample given a sample id. 1073 @param sample_id: ID of the sample to query. 1074 @return: details on the sample used in sample: sample_id. 1075 """ 1076 session = self.Session() 1077 try: 1078 sample = session.query(Sample).get(sample_id) 1079 except AttributeError: 1080 return None 1081 except SQLAlchemyError as e: 1082 log.debug("Database error viewing task: {0}".format(e)) 1083 return None 1084 else: 1085 if sample: 1086 session.expunge(sample) 1087 finally: 1088 session.close() 1089 1090 return sample
1091
1092 - def find_sample(self, md5=None, sha256=None):
1093 """Search samples by MD5. 1094 @param md5: md5 string 1095 @return: matches list 1096 """ 1097 session = self.Session() 1098 try: 1099 if md5: 1100 sample = session.query(Sample).filter(Sample.md5 == md5).first() 1101 elif sha256: 1102 sample = session.query(Sample).filter(Sample.sha256 == sha256).first() 1103 except SQLAlchemyError as e: 1104 log.debug("Database error searching sample: {0}".format(e)) 1105 return None 1106 else: 1107 if sample: 1108 session.expunge(sample) 1109 finally: 1110 session.close() 1111 return sample
1112
1113 - def count_samples(self):
1114 """Counts the amount of samples in the database.""" 1115 session = self.Session() 1116 try: 1117 sample_count = session.query(Sample).count() 1118 except SQLAlchemyError as e: 1119 log.debug("Database error counting samples: {0}".format(e)) 1120 return 0 1121 finally: 1122 session.close() 1123 return sample_count
1124
1125 - def view_machine(self, name):
1126 """Show virtual machine. 1127 @params name: virtual machine name 1128 @return: virtual machine's details 1129 """ 1130 session = self.Session() 1131 try: 1132 machine = session.query(Machine).options(joinedload("tags")).filter(Machine.name == name).first() 1133 except SQLAlchemyError as e: 1134 log.debug("Database error viewing machine: {0}".format(e)) 1135 return None 1136 else: 1137 if machine: 1138 session.expunge(machine) 1139 finally: 1140 session.close() 1141 return machine
1142
1143 - def view_machine_by_label(self, label):
1144 """Show virtual machine. 1145 @params label: virtual machine label 1146 @return: virtual machine's details 1147 """ 1148 session = self.Session() 1149 try: 1150 machine = session.query(Machine).options(joinedload("tags")).filter(Machine.label == label).first() 1151 except SQLAlchemyError as e: 1152 log.debug("Database error viewing machine by label: {0}".format(e)) 1153 return None 1154 else: 1155 if machine: 1156 session.expunge(machine) 1157 finally: 1158 session.close() 1159 return machine
1160
1161 - def view_errors(self, task_id):
1162 """Get all errors related to a task. 1163 @param task_id: ID of task associated to the errors 1164 @return: list of errors. 1165 """ 1166 session = self.Session() 1167 try: 1168 errors = session.query(Error).filter(Error.task_id == task_id).all() 1169 except SQLAlchemyError as e: 1170 log.debug("Database error viewing errors: {0}".format(e)) 1171 return None 1172 finally: 1173 session.close() 1174 return errors
1175