Package lib :: Package cuckoo :: Package core :: Module database
[hide private]
[frames] | no frames]

Source Code for Module lib.cuckoo.core.database

   1  # Copyright (C) 2010-2013 Claudio Guarnieri. 
   2  # Copyright (C) 2014-2016 Cuckoo Foundation. 
   3  # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org 
   4  # See the file 'docs/LICENSE' for copying permission. 
   5   
   6  import os 
   7  import json 
   8  import logging 
   9  from datetime import datetime 
  10   
  11  from lib.cuckoo.common.config import Config, parse_options, emit_options 
  12  from lib.cuckoo.common.constants import CUCKOO_ROOT 
  13  from lib.cuckoo.common.exceptions import CuckooDatabaseError 
  14  from lib.cuckoo.common.exceptions import CuckooOperationalError 
  15  from lib.cuckoo.common.exceptions import CuckooDependencyError 
  16  from lib.cuckoo.common.objects import File, URL, Dictionary 
  17  from lib.cuckoo.common.utils import create_folder, Singleton, classlock, SuperLock, json_encode 
  18   
  19  try: 
  20      from sqlalchemy import create_engine, Column, not_ 
  21      from sqlalchemy import Integer, String, Boolean, DateTime, Enum, func 
  22      from sqlalchemy import ForeignKey, Text, Index, Table 
  23      from sqlalchemy.ext.declarative import declarative_base 
  24      from sqlalchemy.exc import SQLAlchemyError, IntegrityError 
  25      from sqlalchemy.orm import sessionmaker, relationship, joinedload 
  26      from sqlalchemy.ext.hybrid import hybrid_property 
  27      Base = declarative_base() 
  28  except ImportError: 
  29      raise CuckooDependencyError( 
  30          "Unable to import sqlalchemy (install with `pip install sqlalchemy`)" 
  31      ) 
  32   
  33  log = logging.getLogger(__name__) 
  34   
  35  SCHEMA_VERSION = "cd31654d187" 
  36  TASK_PENDING = "pending" 
  37  TASK_RUNNING = "running" 
  38  TASK_COMPLETED = "completed" 
  39  TASK_RECOVERED = "recovered" 
  40  TASK_REPORTED = "reported" 
  41  TASK_FAILED_ANALYSIS = "failed_analysis" 
  42  TASK_FAILED_PROCESSING = "failed_processing" 
  43  TASK_FAILED_REPORTING = "failed_reporting" 
  44   
  45  # Secondary table used in association Machine - Tag. 
  46  machines_tags = Table( 
  47      "machines_tags", Base.metadata, 
  48      Column("machine_id", Integer, ForeignKey("machines.id")), 
  49      Column("tag_id", Integer, ForeignKey("tags.id")) 
  50  ) 
  51   
  52  # Secondary table used in association Task - Tag. 
  53  tasks_tags = Table( 
  54      "tasks_tags", Base.metadata, 
  55      Column("task_id", Integer, ForeignKey("tasks.id")), 
  56      Column("tag_id", Integer, ForeignKey("tags.id")) 
  57  ) 
58 59 -class Machine(Base):
60 """Configured virtual machines to be used as guests.""" 61 __tablename__ = "machines" 62 63 id = Column(Integer(), primary_key=True) 64 name = Column(String(255), nullable=False) 65 label = Column(String(255), nullable=False) 66 ip = Column(String(255), nullable=False) 67 platform = Column(String(255), nullable=False) 68 tags = relationship("Tag", secondary=machines_tags, single_parent=True, 69 backref="machine") 70 options = Column(String(255), nullable=True) 71 interface = Column(String(255), nullable=True) 72 snapshot = Column(String(255), nullable=True) 73 locked = Column(Boolean(), nullable=False, default=False) 74 locked_changed_on = Column(DateTime(timezone=False), nullable=True) 75 status = Column(String(255), nullable=True) 76 status_changed_on = Column(DateTime(timezone=False), nullable=True) 77 resultserver_ip = Column(String(255), nullable=False) 78 resultserver_port = Column(String(255), nullable=False) 79
80 - def __repr__(self):
81 return "<Machine('{0}','{1}')>".format(self.id, self.name)
82
83 - def to_dict(self):
84 """Converts object to dict. 85 @return: dict 86 """ 87 d = {} 88 for column in self.__table__.columns: 89 value = getattr(self, column.name) 90 if isinstance(value, datetime): 91 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S") 92 else: 93 d[column.name] = value 94 95 # Tags are a relation so no column to iterate. 96 d["tags"] = [tag.name for tag in self.tags] 97 return d
98
99 - def to_json(self):
100 """Converts object to JSON. 101 @return: JSON data 102 """ 103 return json.dumps(self.to_dict())
104
105 - def is_analysis(self):
106 """Is this an analysis machine? Generally speaking all machines are 107 analysis machines, however, this is not the case for service VMs. 108 Please refer to the services auxiliary module.""" 109 for tag in self.tags: 110 if tag.name == "service": 111 return 112 return True
113
114 - def __init__(self, name, label, ip, platform, options, interface, 115 snapshot, resultserver_ip, resultserver_port):
116 self.name = name 117 self.label = label 118 self.ip = ip 119 self.platform = platform 120 self.options = options 121 self.interface = interface 122 self.snapshot = snapshot 123 self.resultserver_ip = resultserver_ip 124 self.resultserver_port = resultserver_port
125
126 -class Tag(Base):
127 """Tag describing anything you want.""" 128 __tablename__ = "tags" 129 130 id = Column(Integer(), primary_key=True) 131 name = Column(String(255), nullable=False, unique=True) 132
133 - def __repr__(self):
134 return "<Tag('{0}','{1}')>".format(self.id, self.name)
135
136 - def __init__(self, name):
137 self.name = name
138
139 -class Guest(Base):
140 """Tracks guest run.""" 141 __tablename__ = "guests" 142 143 id = Column(Integer(), primary_key=True) 144 # TODO Replace the guest.status with a more generic Task.status solution. 145 status = Column(String(16), nullable=False) 146 name = Column(String(255), nullable=False) 147 label = Column(String(255), nullable=False) 148 manager = Column(String(255), nullable=False) 149 started_on = Column(DateTime(timezone=False), 150 default=datetime.now, 151 nullable=False) 152 shutdown_on = Column(DateTime(timezone=False), nullable=True) 153 task_id = Column(Integer, 154 ForeignKey("tasks.id"), 155 nullable=False, 156 unique=True) 157
158 - def __repr__(self):
159 return "<Guest('{0}','{1}')>".format(self.id, self.name)
160
161 - def to_dict(self):
162 """Converts object to dict. 163 @return: dict 164 """ 165 d = {} 166 for column in self.__table__.columns: 167 value = getattr(self, column.name) 168 if isinstance(value, datetime): 169 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S") 170 else: 171 d[column.name] = value 172 return d
173
174 - def to_json(self):
175 """Converts object to JSON. 176 @return: JSON data 177 """ 178 return json.dumps(self.to_dict())
179
180 - def __init__(self, name, label, manager):
181 self.name = name 182 self.label = label 183 self.manager = manager
184
185 -class Sample(Base):
186 """Submitted files details.""" 187 __tablename__ = "samples" 188 189 id = Column(Integer(), primary_key=True) 190 file_size = Column(Integer(), nullable=False) 191 file_type = Column(Text(), nullable=False) 192 md5 = Column(String(32), nullable=False) 193 crc32 = Column(String(8), nullable=False) 194 sha1 = Column(String(40), nullable=False) 195 sha256 = Column(String(64), nullable=False) 196 sha512 = Column(String(128), nullable=False) 197 ssdeep = Column(String(255), nullable=True) 198 __table_args__ = Index("hash_index", "md5", "crc32", "sha1", 199 "sha256", "sha512", unique=True), 200
201 - def __repr__(self):
202 return "<Sample('{0}','{1}')>".format(self.id, self.sha256)
203
204 - def to_dict(self):
205 """Converts object to dict. 206 @return: dict 207 """ 208 d = {} 209 for column in self.__table__.columns: 210 d[column.name] = getattr(self, column.name) 211 return d
212
213 - def to_json(self):
214 """Converts object to JSON. 215 @return: JSON data 216 """ 217 return json.dumps(self.to_dict())
218
219 - def __init__(self, md5, crc32, sha1, sha256, sha512, 220 file_size, file_type=None, ssdeep=None):
221 self.md5 = md5 222 self.sha1 = sha1 223 self.crc32 = crc32 224 self.sha256 = sha256 225 self.sha512 = sha512 226 self.file_size = file_size 227 if file_type: 228 self.file_type = file_type 229 if ssdeep: 230 self.ssdeep = ssdeep
231
232 -class Error(Base):
233 """Analysis errors.""" 234 __tablename__ = "errors" 235 236 id = Column(Integer(), primary_key=True) 237 message = Column(String(255), nullable=False) 238 task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False) 239
240 - def to_dict(self):
241 """Converts object to dict. 242 @return: dict 243 """ 244 d = {} 245 for column in self.__table__.columns: 246 d[column.name] = getattr(self, column.name) 247 return d
248
249 - def to_json(self):
250 """Converts object to JSON. 251 @return: JSON data 252 """ 253 return json.dumps(self.to_dict())
254
255 - def __init__(self, message, task_id):
256 self.message = message 257 self.task_id = task_id
258
259 - def __repr__(self):
260 return "<Error('{0}','{1}','{2}')>".format(self.id, self.message, self.task_id)
261
262 -class Task(Base):
263 """Analysis task queue.""" 264 __tablename__ = "tasks" 265 266 id = Column(Integer(), primary_key=True) 267 target = Column(Text(), nullable=False) 268 category = Column(String(255), nullable=False) 269 timeout = Column(Integer(), server_default="0", nullable=False) 270 priority = Column(Integer(), server_default="1", nullable=False) 271 custom = Column(String(255), nullable=True) 272 owner = Column(String(64), nullable=True) 273 machine = Column(String(255), nullable=True) 274 package = Column(String(255), nullable=True) 275 tags = relationship("Tag", secondary=tasks_tags, single_parent=True, 276 backref="task", lazy="subquery") 277 _options = Column("options", String(255), nullable=True) 278 platform = Column(String(255), nullable=True) 279 memory = Column(Boolean, nullable=False, default=False) 280 enforce_timeout = Column(Boolean, nullable=False, default=False) 281 clock = Column(DateTime(timezone=False), 282 default=datetime.now, 283 nullable=False) 284 added_on = Column(DateTime(timezone=False), 285 default=datetime.now, 286 nullable=False) 287 started_on = Column(DateTime(timezone=False), nullable=True) 288 completed_on = Column(DateTime(timezone=False), nullable=True) 289 status = Column(Enum(TASK_PENDING, TASK_RUNNING, TASK_COMPLETED, 290 TASK_REPORTED, TASK_RECOVERED, TASK_FAILED_ANALYSIS, 291 TASK_FAILED_PROCESSING, TASK_FAILED_REPORTING, name="status_type"), 292 server_default=TASK_PENDING, 293 nullable=False) 294 sample_id = Column(Integer, ForeignKey("samples.id"), nullable=True) 295 processing = Column(String(16), nullable=True) 296 route = Column(String(16), nullable=True) 297 sample = relationship("Sample", backref="tasks") 298 guest = relationship("Guest", uselist=False, backref="tasks", cascade="save-update, delete") 299 errors = relationship("Error", backref="tasks", cascade="save-update, delete") 300
301 - def duration(self):
302 if self.started_on and self.completed_on: 303 return (self.completed_on - self.started_on).seconds 304 return -1
305 306 @hybrid_property
307 - def options(self):
308 if not self._options: 309 return {} 310 return parse_options(self._options)
311 312 @options.setter
313 - def options(self, value):
314 self._options = value
315
316 - def to_dict(self):
317 """Converts object to dict. 318 @return: dict 319 """ 320 d = Dictionary() 321 for column in self.__table__.columns: 322 value = getattr(self, column.name) 323 d[column.name] = value 324 325 # Tags are a relation so no column to iterate. 326 d["tags"] = [tag.name for tag in self.tags] 327 d["duration"] = self.duration() 328 d["guest"] = {} 329 330 if self.guest: 331 # Get machine description. 332 d["guest"] = machine = self.guest.to_dict() 333 # Remove superfluous fields. 334 del machine["task_id"] 335 del machine["id"] 336 337 return d
338
339 - def to_json(self):
340 """Converts object to JSON. 341 @return: JSON data 342 """ 343 return json_encode(self.to_dict())
344
345 - def __init__(self, target=None):
346 self.target = target
347
348 - def __repr__(self):
349 return "<Task('{0}','{1}')>".format(self.id, self.target)
350
351 -class AlembicVersion(Base):
352 """Table used to pinpoint actual database schema release.""" 353 __tablename__ = "alembic_version" 354 355 version_num = Column(String(32), nullable=False, primary_key=True)
356
357 -class Database(object):
358 """Analysis queue database. 359 360 This class handles the creation of the database user for internal queue 361 management. It also provides some functions for interacting with it. 362 """ 363 __metaclass__ = Singleton 364
365 - def __init__(self, dsn=None, schema_check=True, echo=False):
366 """ 367 @param dsn: database connection string. 368 @param schema_check: disable or enable the db schema version check. 369 @param echo: echo sql queries. 370 """ 371 self._lock = SuperLock() 372 cfg = Config() 373 374 if dsn: 375 self._connect_database(dsn) 376 elif hasattr(cfg, "database") and cfg.database.connection: 377 self._connect_database(cfg.database.connection) 378 else: 379 db_file = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db") 380 if not os.path.exists(db_file): 381 db_dir = os.path.dirname(db_file) 382 if not os.path.exists(db_dir): 383 try: 384 create_folder(folder=db_dir) 385 except CuckooOperationalError as e: 386 raise CuckooDatabaseError("Unable to create database directory: {0}".format(e)) 387 388 self._connect_database("sqlite:///%s" % db_file) 389 390 # Disable SQL logging. Turn it on for debugging. 391 self.engine.echo = echo 392 393 # Connection timeout. 394 if hasattr(cfg, "database") and cfg.database.timeout: 395 self.engine.pool_timeout = cfg.database.timeout 396 else: 397 self.engine.pool_timeout = 60 398 399 # Let's emit a warning just in case. 400 if not hasattr(cfg, "database"): 401 log.warning("It appears you don't have a valid `database` " 402 "section in conf/cuckoo.conf, using sqlite3 instead.") 403 404 # Create schema. 405 try: 406 Base.metadata.create_all(self.engine) 407 except SQLAlchemyError as e: 408 raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e)) 409 410 # Get db session. 411 self.Session = sessionmaker(bind=self.engine) 412 413 # Deal with schema versioning. 414 # TODO: it's a little bit dirty, needs refactoring. 415 tmp_session = self.Session() 416 if not tmp_session.query(AlembicVersion).count(): 417 # Set database schema version. 418 tmp_session.add(AlembicVersion(version_num=SCHEMA_VERSION)) 419 try: 420 tmp_session.commit() 421 except SQLAlchemyError as e: 422 raise CuckooDatabaseError("Unable to set schema version: {0}".format(e)) 423 tmp_session.rollback() 424 finally: 425 tmp_session.close() 426 else: 427 # Check if db version is the expected one. 428 last = tmp_session.query(AlembicVersion).first() 429 tmp_session.close() 430 if last.version_num != SCHEMA_VERSION and schema_check: 431 raise CuckooDatabaseError( 432 "DB schema version mismatch: found %s, expected %s. " 433 "Try to apply all migrations (cd utils/db_migration/ && " 434 "alembic upgrade head)." % 435 (last.version_num, SCHEMA_VERSION) 436 )
437
438 - def __del__(self):
439 """Disconnects pool.""" 440 self.engine.dispose()
441
442 - def _connect_database(self, connection_string):
443 """Connect to a Database. 444 @param connection_string: Connection string specifying the database 445 """ 446 try: 447 # TODO: this is quite ugly, should improve. 448 if connection_string.startswith("sqlite"): 449 # Using "check_same_thread" to disable sqlite safety check on multiple threads. 450 self.engine = create_engine(connection_string, connect_args={"check_same_thread": False}) 451 elif connection_string.startswith("postgres"): 452 # Disabling SSL mode to avoid some errors using sqlalchemy and multiprocesing. 453 # See: http://www.postgresql.org/docs/9.0/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS 454 self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"}) 455 else: 456 self.engine = create_engine(connection_string) 457 except ImportError as e: 458 lib = e.message.split()[-1] 459 raise CuckooDependencyError( 460 "Missing database driver, unable to import %s (install with " 461 "`pip install %s`)" % (lib, lib) 462 )
463
464 - def _get_or_create(self, session, model, **kwargs):
465 """Get an ORM instance or create it if not exist. 466 @param session: SQLAlchemy session object 467 @param model: model to query 468 @return: row instance 469 """ 470 instance = session.query(model).filter_by(**kwargs).first() 471 return instance or model(**kwargs)
472 473 @classlock
474 - def drop(self):
475 """Drop all tables.""" 476 try: 477 Base.metadata.drop_all(self.engine) 478 except SQLAlchemyError as e: 479 raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e))
480 481 @classlock
482 - def clean_machines(self):
483 """Clean old stored machines and related tables.""" 484 # Secondary table. 485 # TODO: this is better done via cascade delete. 486 self.engine.execute(machines_tags.delete()) 487 488 session = self.Session() 489 try: 490 session.query(Machine).delete() 491 session.commit() 492 except SQLAlchemyError as e: 493 log.debug("Database error cleaning machines: {0}".format(e)) 494 session.rollback() 495 finally: 496 session.close()
497 498 @classlock
499 - def add_machine(self, name, label, ip, platform, options, tags, interface, 500 snapshot, resultserver_ip, resultserver_port):
501 """Add a guest machine. 502 @param name: machine id 503 @param label: machine label 504 @param ip: machine IP address 505 @param platform: machine supported platform 506 @param tags: list of comma separated tags 507 @param interface: sniffing interface for this machine 508 @param snapshot: snapshot name to use instead of the current one, if configured 509 @param resultserver_ip: IP address of the Result Server 510 @param resultserver_port: port of the Result Server 511 """ 512 session = self.Session() 513 machine = Machine(name=name, 514 label=label, 515 ip=ip, 516 platform=platform, 517 options=options, 518 interface=interface, 519 snapshot=snapshot, 520 resultserver_ip=resultserver_ip, 521 resultserver_port=resultserver_port) 522 523 # Deal with tags format (i.e., foo,bar,baz) 524 if tags: 525 for tag in tags.split(","): 526 if tag.strip(): 527 tag = self._get_or_create(session, Tag, name=tag.strip()) 528 machine.tags.append(tag) 529 session.add(machine) 530 531 try: 532 session.commit() 533 except SQLAlchemyError as e: 534 log.debug("Database error adding machine: {0}".format(e)) 535 session.rollback() 536 finally: 537 session.close()
538 539 @classlock
540 - def set_status(self, task_id, status):
541 """Set task status. 542 @param task_id: task identifier 543 @param status: status string 544 @return: operation status 545 """ 546 session = self.Session() 547 try: 548 row = session.query(Task).get(task_id) 549 if not row: 550 return 551 552 row.status = status 553 554 if status == TASK_RUNNING: 555 row.started_on = datetime.now() 556 elif status == TASK_COMPLETED: 557 row.completed_on = datetime.now() 558 559 session.commit() 560 except SQLAlchemyError as e: 561 log.debug("Database error setting status: {0}".format(e)) 562 session.rollback() 563 finally: 564 session.close()
565 566 @classlock
567 - def set_route(self, task_id, route):
568 """Set the taken route of this task. 569 @param task_id: task identifier 570 @param route: route string 571 @return: operation status 572 """ 573 session = self.Session() 574 try: 575 row = session.query(Task).get(task_id) 576 if not row: 577 return 578 579 row.route = route 580 session.commit() 581 except SQLAlchemyError as e: 582 log.debug("Database error setting route: {0}".format(e)) 583 session.rollback() 584 finally: 585 session.close()
586 587 @classlock
588 - def fetch(self, machine=None, service=True):
589 """Fetches a task waiting to be processed and locks it for running. 590 @return: None or task 591 """ 592 session = self.Session() 593 try: 594 q = session.query(Task).filter_by(status=TASK_PENDING) 595 596 if machine: 597 q = q.filter_by(machine=machine) 598 599 if not service: 600 q = q.filter(not_(Task.tags.any(name="service"))) 601 602 row = q.order_by(Task.priority.desc(), Task.added_on).first() 603 if row: 604 self.set_status(task_id=row.id, status=TASK_RUNNING) 605 session.refresh(row) 606 607 return row 608 except SQLAlchemyError as e: 609 log.debug("Database error fetching task: {0}".format(e)) 610 session.rollback() 611 finally: 612 session.close()
613 614 @classlock
615 - def guest_start(self, task_id, name, label, manager):
616 """Logs guest start. 617 @param task_id: task identifier 618 @param name: vm name 619 @param label: vm label 620 @param manager: vm manager 621 @return: guest row id 622 """ 623 session = self.Session() 624 guest = Guest(name, label, manager) 625 try: 626 guest.status = "init" 627 session.query(Task).get(task_id).guest = guest 628 session.commit() 629 session.refresh(guest) 630 return guest.id 631 except SQLAlchemyError as e: 632 log.debug("Database error logging guest start: {0}".format(e)) 633 session.rollback() 634 return None 635 finally: 636 session.close()
637 638 @classlock
639 - def guest_get_status(self, task_id):
640 """Logs guest start. 641 @param task_id: task id 642 @return: guest status 643 """ 644 session = self.Session() 645 try: 646 guest = session.query(Guest).filter_by(task_id=task_id).first() 647 return guest.status if guest else None 648 except SQLAlchemyError as e: 649 log.debug("Database error logging guest start: {0}".format(e)) 650 session.rollback() 651 return 652 finally: 653 session.close()
654 655 @classlock
656 - def guest_set_status(self, task_id, status):
657 """Logs guest start. 658 @param task_id: task identifier 659 @param status: status 660 """ 661 session = self.Session() 662 try: 663 guest = session.query(Guest).filter_by(task_id=task_id).first() 664 guest.status = status 665 session.commit() 666 session.refresh(guest) 667 except SQLAlchemyError as e: 668 log.debug("Database error logging guest start: {0}".format(e)) 669 session.rollback() 670 return None 671 finally: 672 session.close()
673 674 @classlock
675 - def guest_remove(self, guest_id):
676 """Removes a guest start entry.""" 677 session = self.Session() 678 try: 679 guest = session.query(Guest).get(guest_id) 680 session.delete(guest) 681 session.commit() 682 except SQLAlchemyError as e: 683 log.debug("Database error logging guest remove: {0}".format(e)) 684 session.rollback() 685 return None 686 finally: 687 session.close()
688 689 @classlock
690 - def guest_stop(self, guest_id):
691 """Logs guest stop. 692 @param guest_id: guest log entry id 693 """ 694 session = self.Session() 695 try: 696 guest = session.query(Guest).get(guest_id) 697 guest.status = "stopped" 698 guest.shutdown_on = datetime.now() 699 session.commit() 700 except SQLAlchemyError as e: 701 log.debug("Database error logging guest stop: {0}".format(e)) 702 session.rollback() 703 except TypeError: 704 log.warning("Data inconsistency in guests table detected, it might be a crash leftover. Continue") 705 session.rollback() 706 finally: 707 session.close()
708 709 @classlock
710 - def list_machines(self, locked=False):
711 """Lists virtual machines. 712 @return: list of virtual machines 713 """ 714 session = self.Session() 715 try: 716 if locked: 717 machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=True).all() 718 else: 719 machines = session.query(Machine).options(joinedload("tags")).all() 720 return machines 721 except SQLAlchemyError as e: 722 log.debug("Database error listing machines: {0}".format(e)) 723 return [] 724 finally: 725 session.close()
726 727 @classlock
728 - def lock_machine(self, label=None, platform=None, tags=None):
729 """Places a lock on a free virtual machine. 730 @param label: optional virtual machine label 731 @param platform: optional virtual machine platform 732 @param tags: optional tags required (list) 733 @return: locked machine 734 """ 735 session = self.Session() 736 737 # Preventive checks. 738 if label and platform: 739 # Wrong usage. 740 log.error("You can select machine only by label or by platform.") 741 return None 742 elif label and tags: 743 # Also wrong usage. 744 log.error("You can select machine only by label or by tags.") 745 return None 746 747 try: 748 machines = session.query(Machine) 749 if label: 750 machines = machines.filter_by(label=label) 751 if platform: 752 machines = machines.filter_by(platform=platform) 753 if tags: 754 for tag in tags: 755 machines = machines.filter(Machine.tags.any(name=tag)) 756 757 # Check if there are any machines that satisfy the 758 # selection requirements. 759 if not machines.count(): 760 raise CuckooOperationalError("No machines match selection criteria.") 761 762 # Get the first free machine. 763 machine = machines.filter_by(locked=False).first() 764 except SQLAlchemyError as e: 765 log.debug("Database error locking machine: {0}".format(e)) 766 session.close() 767 return None 768 769 if machine: 770 machine.locked = True 771 machine.locked_changed_on = datetime.now() 772 try: 773 session.commit() 774 session.refresh(machine) 775 except SQLAlchemyError as e: 776 log.debug("Database error locking machine: {0}".format(e)) 777 session.rollback() 778 return None 779 finally: 780 session.close() 781 else: 782 session.close() 783 784 return machine
785 786 @classlock
787 - def unlock_machine(self, label):
788 """Remove lock form a virtual machine. 789 @param label: virtual machine label 790 @return: unlocked machine 791 """ 792 session = self.Session() 793 try: 794 machine = session.query(Machine).filter_by(label=label).first() 795 except SQLAlchemyError as e: 796 log.debug("Database error unlocking machine: {0}".format(e)) 797 session.close() 798 return None 799 800 if machine: 801 machine.locked = False 802 machine.locked_changed_on = datetime.now() 803 try: 804 session.commit() 805 session.refresh(machine) 806 except SQLAlchemyError as e: 807 log.debug("Database error locking machine: {0}".format(e)) 808 session.rollback() 809 return None 810 finally: 811 session.close() 812 813 return machine
814 815 @classlock
816 - def count_machines_available(self):
817 """How many virtual machines are ready for analysis. 818 @return: free virtual machines count 819 """ 820 session = self.Session() 821 try: 822 machines_count = session.query(Machine).filter_by(locked=False).count() 823 return machines_count 824 except SQLAlchemyError as e: 825 log.debug("Database error counting machines: {0}".format(e)) 826 return 0 827 finally: 828 session.close()
829 830 @classlock
831 - def get_available_machines(self):
832 """ Which machines are available 833 @return: free virtual machines 834 """ 835 session = self.Session() 836 try: 837 machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=False).all() 838 return machines 839 except SQLAlchemyError as e: 840 log.debug("Database error getting available machines: {0}".format(e)) 841 return [] 842 finally: 843 session.close()
844 845 @classlock
846 - def set_machine_status(self, label, status):
847 """Set status for a virtual machine. 848 @param label: virtual machine label 849 @param status: new virtual machine status 850 """ 851 session = self.Session() 852 try: 853 machine = session.query(Machine).filter_by(label=label).first() 854 except SQLAlchemyError as e: 855 log.debug("Database error setting machine status: {0}".format(e)) 856 session.close() 857 return 858 859 if machine: 860 machine.status = status 861 machine.status_changed_on = datetime.now() 862 try: 863 session.commit() 864 session.refresh(machine) 865 except SQLAlchemyError as e: 866 log.debug("Database error setting machine status: {0}".format(e)) 867 session.rollback() 868 finally: 869 session.close() 870 else: 871 session.close()
872 873 @classlock
874 - def add_error(self, message, task_id):
875 """Add an error related to a task. 876 @param message: error message 877 @param task_id: ID of the related task 878 """ 879 session = self.Session() 880 error = Error(message=message, task_id=task_id) 881 session.add(error) 882 try: 883 session.commit() 884 except SQLAlchemyError as e: 885 log.debug("Database error adding error log: {0}".format(e)) 886 session.rollback() 887 finally: 888 session.close()
889 890 # The following functions are mostly used by external utils. 891 892 @classlock
893 - def add(self, obj, timeout=0, package="", options="", priority=1, 894 custom="", owner="", machine="", platform="", tags=None, 895 memory=False, enforce_timeout=False, clock=None, category=None):
896 """Add a task to database. 897 @param obj: object to add (File or URL). 898 @param timeout: selected timeout. 899 @param options: analysis options. 900 @param priority: analysis priority. 901 @param custom: custom options. 902 @param owner: task owner. 903 @param machine: selected machine. 904 @param platform: platform. 905 @param tags: optional tags that must be set for machine selection 906 @param memory: toggle full memory dump. 907 @param enforce_timeout: toggle full timeout execution. 908 @param clock: virtual machine clock time 909 @return: cursor or None. 910 """ 911 session = self.Session() 912 913 # Convert empty strings and None values to a valid int 914 if not timeout: 915 timeout = 0 916 if not priority: 917 priority = 1 918 919 if isinstance(obj, File): 920 sample = Sample(md5=obj.get_md5(), 921 crc32=obj.get_crc32(), 922 sha1=obj.get_sha1(), 923 sha256=obj.get_sha256(), 924 sha512=obj.get_sha512(), 925 file_size=obj.get_size(), 926 file_type=obj.get_type(), 927 ssdeep=obj.get_ssdeep()) 928 session.add(sample) 929 930 try: 931 session.commit() 932 except IntegrityError: 933 session.rollback() 934 try: 935 sample = session.query(Sample).filter_by(md5=obj.get_md5()).first() 936 except SQLAlchemyError as e: 937 log.debug("Error querying sample for hash: {0}".format(e)) 938 session.close() 939 return None 940 except SQLAlchemyError as e: 941 log.debug("Database error adding task: {0}".format(e)) 942 session.close() 943 return None 944 945 task = Task(obj.file_path) 946 task.sample_id = sample.id 947 elif isinstance(obj, URL): 948 task = Task(obj.url) 949 else: 950 task = Task("none") 951 952 task.category = category 953 task.timeout = timeout 954 task.package = package 955 task.options = options 956 task.priority = priority 957 task.custom = custom 958 task.owner = owner 959 task.machine = machine 960 task.platform = platform 961 task.memory = memory 962 task.enforce_timeout = enforce_timeout 963 964 # Deal with tags format (i.e., foo,bar,baz) 965 if tags: 966 for tag in tags.split(","): 967 tag = self._get_or_create(session, Tag, name=tag.strip()) 968 task.tags.append(tag) 969 970 if clock: 971 if isinstance(clock, str) or isinstance(clock, unicode): 972 try: 973 task.clock = datetime.strptime(clock, "%m-%d-%Y %H:%M:%S") 974 except ValueError: 975 log.warning("The date you specified has an invalid format, using current timestamp.") 976 task.clock = datetime.now() 977 else: 978 task.clock = clock 979 980 session.add(task) 981 982 try: 983 session.commit() 984 task_id = task.id 985 except SQLAlchemyError as e: 986 log.debug("Database error adding task: {0}".format(e)) 987 session.rollback() 988 return None 989 finally: 990 session.close() 991 992 return task_id
993
994 - def add_path(self, file_path, timeout=0, package="", options="", 995 priority=1, custom="", owner="", machine="", platform="", 996 tags=None, memory=False, enforce_timeout=False, clock=None):
997 """Add a task to database from file path. 998 @param file_path: sample path. 999 @param timeout: selected timeout. 1000 @param options: analysis options. 1001 @param priority: analysis priority. 1002 @param custom: custom options. 1003 @param owner: task owner. 1004 @param machine: selected machine. 1005 @param platform: platform. 1006 @param tags: Tags required in machine selection 1007 @param memory: toggle full memory dump. 1008 @param enforce_timeout: toggle full timeout execution. 1009 @param clock: virtual machine clock time 1010 @return: cursor or None. 1011 """ 1012 if not file_path or not os.path.exists(file_path): 1013 log.warning("File does not exist: %s.", file_path) 1014 return None 1015 1016 # Convert empty strings and None values to a valid int 1017 if not timeout: 1018 timeout = 0 1019 if not priority: 1020 priority = 1 1021 1022 return self.add(File(file_path), timeout, package, options, priority, 1023 custom, owner, machine, platform, tags, memory, 1024 enforce_timeout, clock, "file")
1025
1026 - def add_url(self, url, timeout=0, package="", options="", priority=1, 1027 custom="", owner="", machine="", platform="", tags=None, 1028 memory=False, enforce_timeout=False, clock=None):
1029 """Add a task to database from url. 1030 @param url: url. 1031 @param timeout: selected timeout. 1032 @param options: analysis options. 1033 @param priority: analysis priority. 1034 @param custom: custom options. 1035 @param owner: task owner. 1036 @param machine: selected machine. 1037 @param platform: platform. 1038 @param tags: tags for machine selection 1039 @param memory: toggle full memory dump. 1040 @param enforce_timeout: toggle full timeout execution. 1041 @param clock: virtual machine clock time 1042 @return: cursor or None. 1043 """ 1044 1045 # Convert empty strings and None values to a valid int 1046 if not timeout: 1047 timeout = 0 1048 if not priority: 1049 priority = 1 1050 1051 return self.add(URL(url), timeout, package, options, priority, 1052 custom, owner, machine, platform, tags, memory, 1053 enforce_timeout, clock, "url")
1054
1055 - def add_baseline(self, timeout=0, owner="", machine="", memory=False):
1056 """Add a baseline task to database. 1057 @param timeout: selected timeout. 1058 @param owner: task owner. 1059 @param machine: selected machine. 1060 @param memory: toggle full memory dump. 1061 @return: cursor or None. 1062 """ 1063 return self.add(None, timeout=timeout or 0, priority=999, owner=owner, 1064 machine=machine, memory=memory, category="baseline")
1065
1066 - def add_service(self, timeout, owner, tags):
1067 """Add a service task to database. 1068 @param timeout: selected timeout. 1069 @param owner: task owner. 1070 @param tags: task tags. 1071 @return: cursor or None. 1072 """ 1073 return self.add(None, timeout=timeout, priority=999, owner=owner, 1074 tags=tags, category="service")
1075
1076 - def add_reboot(self, task_id, timeout=0, options="", priority=1, 1077 owner="", machine="", platform="", tags=None, memory=False, 1078 enforce_timeout=False, clock=None):
1079 """Add a reboot task to database from an existing analysis. 1080 @param task_id: task id of existing analysis. 1081 @param timeout: selected timeout. 1082 @param options: analysis options. 1083 @param priority: analysis priority. 1084 @param owner: task owner. 1085 @param machine: selected machine. 1086 @param platform: platform. 1087 @param tags: tags for machine selection 1088 @param memory: toggle full memory dump. 1089 @param enforce_timeout: toggle full timeout execution. 1090 @param clock: virtual machine clock time 1091 @return: cursor or None. 1092 """ 1093 1094 # Convert empty strings and None values to a valid int 1095 if not timeout: 1096 timeout = 0 1097 if not priority: 1098 priority = 1 1099 1100 task = self.view_task(task_id) 1101 if not task or not os.path.exists(task.target): 1102 log.error( 1103 "Unable to add reboot analysis as the original task or its " 1104 "sample has already been deleted." 1105 ) 1106 return 1107 1108 # TODO Integrate the Reboot screen with the submission portal and 1109 # pass the parent task ID through as part of the "options". 1110 custom = "%s" % task_id 1111 1112 return self.add(File(task.target), timeout, "reboot", options, 1113 priority, custom, owner, machine, platform, tags, 1114 memory, enforce_timeout, clock, "file")
1115 1116 @classlock
1117 - def reschedule(self, task_id, priority=None):
1118 """Reschedule a task. 1119 @param task_id: ID of the task to reschedule. 1120 @return: ID of the newly created task. 1121 """ 1122 task = self.view_task(task_id) 1123 if not task: 1124 return 1125 1126 if task.category == "file": 1127 add = self.add_path 1128 elif task.category == "url": 1129 add = self.add_url 1130 else: 1131 return 1132 1133 # Change status to recovered. 1134 session = self.Session() 1135 session.query(Task).get(task_id).status = TASK_RECOVERED 1136 try: 1137 session.commit() 1138 except SQLAlchemyError as e: 1139 log.debug("Database error rescheduling task: {0}".format(e)) 1140 session.rollback() 1141 return False 1142 finally: 1143 session.close() 1144 1145 # Normalize tags. 1146 if task.tags: 1147 tags = ",".join(tag.name for tag in task.tags) 1148 else: 1149 tags = task.tags 1150 1151 # Assign a new priority. 1152 if priority: 1153 task.priority = priority 1154 1155 options = emit_options(task.options) 1156 return add(task.target, task.timeout, task.package, options, 1157 task.priority, task.custom, task.owner, task.machine, 1158 task.platform, tags, task.memory, task.enforce_timeout, 1159 task.clock)
1160
1161 - def list_tasks(self, limit=None, details=True, category=None, owner=None, 1162 offset=None, status=None, sample_id=None, not_status=None, 1163 completed_after=None, order_by=None):
1164 """Retrieve list of task. 1165 @param limit: specify a limit of entries. 1166 @param details: if details about must be included 1167 @param category: filter by category 1168 @param owner: task owner 1169 @param offset: list offset 1170 @param status: filter by task status 1171 @param sample_id: filter tasks for a sample 1172 @param not_status: exclude this task status from filter 1173 @param completed_after: only list tasks completed after this timestamp 1174 @param order_by: definition which field to sort by 1175 @return: list of tasks. 1176 """ 1177 session = self.Session() 1178 try: 1179 search = session.query(Task) 1180 1181 if status: 1182 search = search.filter_by(status=status) 1183 if not_status: 1184 search = search.filter(Task.status != not_status) 1185 if category: 1186 search = search.filter_by(category=category) 1187 if owner: 1188 search = search.filter_by(owner=owner) 1189 if details: 1190 search = search.options(joinedload("guest"), joinedload("errors"), joinedload("tags")) 1191 if sample_id is not None: 1192 search = search.filter_by(sample_id=sample_id) 1193 if completed_after: 1194 search = search.filter(Task.completed_on > completed_after) 1195 1196 if order_by is not None: 1197 search = search.order_by(order_by) 1198 else: 1199 search = search.order_by(Task.added_on.desc()) 1200 1201 tasks = search.limit(limit).offset(offset).all() 1202 return tasks 1203 except SQLAlchemyError as e: 1204 log.debug("Database error listing tasks: {0}".format(e)) 1205 return [] 1206 finally: 1207 session.close()
1208
1209 - def minmax_tasks(self):
1210 """Find tasks minimum and maximum 1211 @return: unix timestamps of minimum and maximum 1212 """ 1213 session = self.Session() 1214 try: 1215 _min = session.query(func.min(Task.started_on).label("min")).first() 1216 _max = session.query(func.max(Task.completed_on).label("max")).first() 1217 return int(_min[0].strftime("%s")), int(_max[0].strftime("%s")) 1218 except SQLAlchemyError as e: 1219 log.debug("Database error counting tasks: {0}".format(e)) 1220 return 0 1221 finally: 1222 session.close()
1223 1224 @classlock
1225 - def count_tasks(self, status=None):
1226 """Count tasks in the database 1227 @param status: apply a filter according to the task status 1228 @return: number of tasks found 1229 """ 1230 session = self.Session() 1231 try: 1232 if status: 1233 tasks_count = session.query(Task).filter_by(status=status).count() 1234 else: 1235 tasks_count = session.query(Task).count() 1236 return tasks_count 1237 except SQLAlchemyError as e: 1238 log.debug("Database error counting tasks: {0}".format(e)) 1239 return 0 1240 finally: 1241 session.close()
1242 1243 @classlock
1244 - def view_task(self, task_id, details=True):
1245 """Retrieve information on a task. 1246 @param task_id: ID of the task to query. 1247 @return: details on the task. 1248 """ 1249 session = self.Session() 1250 try: 1251 if details: 1252 task = session.query(Task).options(joinedload("guest"), joinedload("errors"), joinedload("tags")).get(task_id) 1253 else: 1254 task = session.query(Task).get(task_id) 1255 except SQLAlchemyError as e: 1256 log.debug("Database error viewing task: {0}".format(e)) 1257 return None 1258 else: 1259 if task: 1260 session.expunge(task) 1261 return task 1262 finally: 1263 session.close()
1264 1265 @classlock
1266 - def delete_task(self, task_id):
1267 """Delete information on a task. 1268 @param task_id: ID of the task to query. 1269 @return: operation status. 1270 """ 1271 session = self.Session() 1272 try: 1273 task = session.query(Task).get(task_id) 1274 session.delete(task) 1275 session.commit() 1276 except SQLAlchemyError as e: 1277 log.debug("Database error deleting task: {0}".format(e)) 1278 session.rollback() 1279 return False 1280 finally: 1281 session.close() 1282 return True
1283 1284 @classlock
1285 - def view_sample(self, sample_id):
1286 """Retrieve information on a sample given a sample id. 1287 @param sample_id: ID of the sample to query. 1288 @return: details on the sample used in sample: sample_id. 1289 """ 1290 session = self.Session() 1291 try: 1292 sample = session.query(Sample).get(sample_id) 1293 except AttributeError: 1294 return None 1295 except SQLAlchemyError as e: 1296 log.debug("Database error viewing task: {0}".format(e)) 1297 return None 1298 else: 1299 if sample: 1300 session.expunge(sample) 1301 finally: 1302 session.close() 1303 1304 return sample
1305 1306 @classlock
1307 - def find_sample(self, md5=None, sha256=None):
1308 """Search samples by MD5. 1309 @param md5: md5 string 1310 @return: matches list 1311 """ 1312 session = self.Session() 1313 try: 1314 if md5: 1315 sample = session.query(Sample).filter_by(md5=md5).first() 1316 elif sha256: 1317 sample = session.query(Sample).filter_by(sha256=sha256).first() 1318 except SQLAlchemyError as e: 1319 log.debug("Database error searching sample: {0}".format(e)) 1320 return None 1321 else: 1322 if sample: 1323 session.expunge(sample) 1324 finally: 1325 session.close() 1326 return sample
1327 1328 @classlock
1329 - def count_samples(self):
1330 """Counts the amount of samples in the database.""" 1331 session = self.Session() 1332 try: 1333 sample_count = session.query(Sample).count() 1334 except SQLAlchemyError as e: 1335 log.debug("Database error counting samples: {0}".format(e)) 1336 return 0 1337 finally: 1338 session.close() 1339 return sample_count
1340 1341 @classlock
1342 - def view_machine(self, name):
1343 """Show virtual machine. 1344 @params name: virtual machine name 1345 @return: virtual machine's details 1346 """ 1347 session = self.Session() 1348 try: 1349 machine = session.query(Machine).options(joinedload("tags")).filter_by(name=name).first() 1350 except SQLAlchemyError as e: 1351 log.debug("Database error viewing machine: {0}".format(e)) 1352 return None 1353 else: 1354 if machine: 1355 session.expunge(machine) 1356 finally: 1357 session.close() 1358 return machine
1359 1360 @classlock
1361 - def view_machine_by_label(self, label):
1362 """Show virtual machine. 1363 @params label: virtual machine label 1364 @return: virtual machine's details 1365 """ 1366 session = self.Session() 1367 try: 1368 machine = session.query(Machine).options(joinedload("tags")).filter_by(label=label).first() 1369 except SQLAlchemyError as e: 1370 log.debug("Database error viewing machine by label: {0}".format(e)) 1371 return None 1372 else: 1373 if machine: 1374 session.expunge(machine) 1375 finally: 1376 session.close() 1377 return machine
1378 1379 @classlock
1380 - def view_errors(self, task_id):
1381 """Get all errors related to a task. 1382 @param task_id: ID of task associated to the errors 1383 @return: list of errors. 1384 """ 1385 session = self.Session() 1386 try: 1387 errors = session.query(Error).filter_by(task_id=task_id).all() 1388 except SQLAlchemyError as e: 1389 log.debug("Database error viewing errors: {0}".format(e)) 1390 return [] 1391 finally: 1392 session.close() 1393 return errors
1394
1395 - def processing_get_task(self, instance):
1396 """Get an available task for processing.""" 1397 session = self.Session() 1398 1399 # Please feel free to sqlalchemize the following query, but I didn't 1400 # get that far. This seems to be doing a fine job for avoiding race 1401 # conditions - especially with the session.commit() thing. But any 1402 # improvements are welcome. 1403 # TODO We can get rid of the `processing` column once again by 1404 # introducing a "reporting" status, but this requires annoying 1405 # database migrations, so leaving that for another day. 1406 query = """ 1407 UPDATE tasks SET processing = :instance 1408 WHERE id IN ( 1409 SELECT id FROM tasks 1410 WHERE status = :status AND processing IS NULL 1411 ORDER BY priority DESC, id ASC LIMIT 1 FOR UPDATE 1412 ) 1413 RETURNING id 1414 """ 1415 1416 try: 1417 params = { 1418 "instance": instance, 1419 "status": TASK_COMPLETED, 1420 } 1421 task = session.execute(query, params).first() 1422 session.commit() 1423 return task[0] if task else None 1424 except SQLAlchemyError as e: 1425 log.debug("Database error getting new processing tasks: %s", e) 1426 finally: 1427 session.close()
1428