1
2
3
4
5
6 import os
7 import json
8 import logging
9 from datetime import datetime
10
11 from lib.cuckoo.common.config import Config, parse_options, emit_options
12 from lib.cuckoo.common.constants import CUCKOO_ROOT
13 from lib.cuckoo.common.exceptions import CuckooDatabaseError
14 from lib.cuckoo.common.exceptions import CuckooOperationalError
15 from lib.cuckoo.common.exceptions import CuckooDependencyError
16 from lib.cuckoo.common.objects import File, URL, Dictionary
17 from lib.cuckoo.common.utils import create_folder, Singleton, classlock, SuperLock, json_encode
18
19 try:
20 from sqlalchemy import create_engine, Column, not_
21 from sqlalchemy import Integer, String, Boolean, DateTime, Enum, func
22 from sqlalchemy import ForeignKey, Text, Index, Table
23 from sqlalchemy.ext.declarative import declarative_base
24 from sqlalchemy.exc import SQLAlchemyError, IntegrityError
25 from sqlalchemy.orm import sessionmaker, relationship, joinedload
26 from sqlalchemy.ext.hybrid import hybrid_property
27 Base = declarative_base()
28 except ImportError:
29 raise CuckooDependencyError(
30 "Unable to import sqlalchemy (install with `pip install sqlalchemy`)"
31 )
32
33 log = logging.getLogger(__name__)
34
35 SCHEMA_VERSION = "cd31654d187"
36 TASK_PENDING = "pending"
37 TASK_RUNNING = "running"
38 TASK_COMPLETED = "completed"
39 TASK_RECOVERED = "recovered"
40 TASK_REPORTED = "reported"
41 TASK_FAILED_ANALYSIS = "failed_analysis"
42 TASK_FAILED_PROCESSING = "failed_processing"
43 TASK_FAILED_REPORTING = "failed_reporting"
44
45
46 machines_tags = Table(
47 "machines_tags", Base.metadata,
48 Column("machine_id", Integer, ForeignKey("machines.id")),
49 Column("tag_id", Integer, ForeignKey("tags.id"))
50 )
51
52
53 tasks_tags = Table(
54 "tasks_tags", Base.metadata,
55 Column("task_id", Integer, ForeignKey("tasks.id")),
56 Column("tag_id", Integer, ForeignKey("tags.id"))
57 )
60 """Configured virtual machines to be used as guests."""
61 __tablename__ = "machines"
62
63 id = Column(Integer(), primary_key=True)
64 name = Column(String(255), nullable=False)
65 label = Column(String(255), nullable=False)
66 ip = Column(String(255), nullable=False)
67 platform = Column(String(255), nullable=False)
68 tags = relationship("Tag", secondary=machines_tags, single_parent=True,
69 backref="machine")
70 options = Column(String(255), nullable=True)
71 interface = Column(String(255), nullable=True)
72 snapshot = Column(String(255), nullable=True)
73 locked = Column(Boolean(), nullable=False, default=False)
74 locked_changed_on = Column(DateTime(timezone=False), nullable=True)
75 status = Column(String(255), nullable=True)
76 status_changed_on = Column(DateTime(timezone=False), nullable=True)
77 resultserver_ip = Column(String(255), nullable=False)
78 resultserver_port = Column(String(255), nullable=False)
79
81 return "<Machine('{0}','{1}')>".format(self.id, self.name)
82
84 """Converts object to dict.
85 @return: dict
86 """
87 d = {}
88 for column in self.__table__.columns:
89 value = getattr(self, column.name)
90 if isinstance(value, datetime):
91 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S")
92 else:
93 d[column.name] = value
94
95
96 d["tags"] = [tag.name for tag in self.tags]
97 return d
98
100 """Converts object to JSON.
101 @return: JSON data
102 """
103 return json.dumps(self.to_dict())
104
106 """Is this an analysis machine? Generally speaking all machines are
107 analysis machines, however, this is not the case for service VMs.
108 Please refer to the services auxiliary module."""
109 for tag in self.tags:
110 if tag.name == "service":
111 return
112 return True
113
114 - def __init__(self, name, label, ip, platform, options, interface,
115 snapshot, resultserver_ip, resultserver_port):
125
127 """Tag describing anything you want."""
128 __tablename__ = "tags"
129
130 id = Column(Integer(), primary_key=True)
131 name = Column(String(255), nullable=False, unique=True)
132
134 return "<Tag('{0}','{1}')>".format(self.id, self.name)
135
138
140 """Tracks guest run."""
141 __tablename__ = "guests"
142
143 id = Column(Integer(), primary_key=True)
144
145 status = Column(String(16), nullable=False)
146 name = Column(String(255), nullable=False)
147 label = Column(String(255), nullable=False)
148 manager = Column(String(255), nullable=False)
149 started_on = Column(DateTime(timezone=False),
150 default=datetime.now,
151 nullable=False)
152 shutdown_on = Column(DateTime(timezone=False), nullable=True)
153 task_id = Column(Integer,
154 ForeignKey("tasks.id"),
155 nullable=False,
156 unique=True)
157
159 return "<Guest('{0}','{1}')>".format(self.id, self.name)
160
162 """Converts object to dict.
163 @return: dict
164 """
165 d = {}
166 for column in self.__table__.columns:
167 value = getattr(self, column.name)
168 if isinstance(value, datetime):
169 d[column.name] = value.strftime("%Y-%m-%d %H:%M:%S")
170 else:
171 d[column.name] = value
172 return d
173
175 """Converts object to JSON.
176 @return: JSON data
177 """
178 return json.dumps(self.to_dict())
179
180 - def __init__(self, name, label, manager):
184
186 """Submitted files details."""
187 __tablename__ = "samples"
188
189 id = Column(Integer(), primary_key=True)
190 file_size = Column(Integer(), nullable=False)
191 file_type = Column(Text(), nullable=False)
192 md5 = Column(String(32), nullable=False)
193 crc32 = Column(String(8), nullable=False)
194 sha1 = Column(String(40), nullable=False)
195 sha256 = Column(String(64), nullable=False)
196 sha512 = Column(String(128), nullable=False)
197 ssdeep = Column(String(255), nullable=True)
198 __table_args__ = Index("hash_index", "md5", "crc32", "sha1",
199 "sha256", "sha512", unique=True),
200
202 return "<Sample('{0}','{1}')>".format(self.id, self.sha256)
203
205 """Converts object to dict.
206 @return: dict
207 """
208 d = {}
209 for column in self.__table__.columns:
210 d[column.name] = getattr(self, column.name)
211 return d
212
214 """Converts object to JSON.
215 @return: JSON data
216 """
217 return json.dumps(self.to_dict())
218
219 - def __init__(self, md5, crc32, sha1, sha256, sha512,
220 file_size, file_type=None, ssdeep=None):
231
233 """Analysis errors."""
234 __tablename__ = "errors"
235
236 id = Column(Integer(), primary_key=True)
237 message = Column(String(255), nullable=False)
238 task_id = Column(Integer, ForeignKey("tasks.id"), nullable=False)
239
241 """Converts object to dict.
242 @return: dict
243 """
244 d = {}
245 for column in self.__table__.columns:
246 d[column.name] = getattr(self, column.name)
247 return d
248
250 """Converts object to JSON.
251 @return: JSON data
252 """
253 return json.dumps(self.to_dict())
254
258
260 return "<Error('{0}','{1}','{2}')>".format(self.id, self.message, self.task_id)
261
263 """Analysis task queue."""
264 __tablename__ = "tasks"
265
266 id = Column(Integer(), primary_key=True)
267 target = Column(Text(), nullable=False)
268 category = Column(String(255), nullable=False)
269 timeout = Column(Integer(), server_default="0", nullable=False)
270 priority = Column(Integer(), server_default="1", nullable=False)
271 custom = Column(String(255), nullable=True)
272 owner = Column(String(64), nullable=True)
273 machine = Column(String(255), nullable=True)
274 package = Column(String(255), nullable=True)
275 tags = relationship("Tag", secondary=tasks_tags, single_parent=True,
276 backref="task", lazy="subquery")
277 _options = Column("options", String(255), nullable=True)
278 platform = Column(String(255), nullable=True)
279 memory = Column(Boolean, nullable=False, default=False)
280 enforce_timeout = Column(Boolean, nullable=False, default=False)
281 clock = Column(DateTime(timezone=False),
282 default=datetime.now,
283 nullable=False)
284 added_on = Column(DateTime(timezone=False),
285 default=datetime.now,
286 nullable=False)
287 started_on = Column(DateTime(timezone=False), nullable=True)
288 completed_on = Column(DateTime(timezone=False), nullable=True)
289 status = Column(Enum(TASK_PENDING, TASK_RUNNING, TASK_COMPLETED,
290 TASK_REPORTED, TASK_RECOVERED, TASK_FAILED_ANALYSIS,
291 TASK_FAILED_PROCESSING, TASK_FAILED_REPORTING, name="status_type"),
292 server_default=TASK_PENDING,
293 nullable=False)
294 sample_id = Column(Integer, ForeignKey("samples.id"), nullable=True)
295 processing = Column(String(16), nullable=True)
296 route = Column(String(16), nullable=True)
297 sample = relationship("Sample", backref="tasks")
298 guest = relationship("Guest", uselist=False, backref="tasks", cascade="save-update, delete")
299 errors = relationship("Error", backref="tasks", cascade="save-update, delete")
300
305
306 @hybrid_property
311
312 @options.setter
315
317 """Converts object to dict.
318 @return: dict
319 """
320 d = Dictionary()
321 for column in self.__table__.columns:
322 value = getattr(self, column.name)
323 d[column.name] = value
324
325
326 d["tags"] = [tag.name for tag in self.tags]
327 d["duration"] = self.duration()
328 d["guest"] = {}
329
330 if self.guest:
331
332 d["guest"] = machine = self.guest.to_dict()
333
334 del machine["task_id"]
335 del machine["id"]
336
337 return d
338
340 """Converts object to JSON.
341 @return: JSON data
342 """
343 return json_encode(self.to_dict())
344
347
349 return "<Task('{0}','{1}')>".format(self.id, self.target)
350
352 """Table used to pinpoint actual database schema release."""
353 __tablename__ = "alembic_version"
354
355 version_num = Column(String(32), nullable=False, primary_key=True)
356
358 """Analysis queue database.
359
360 This class handles the creation of the database user for internal queue
361 management. It also provides some functions for interacting with it.
362 """
363 __metaclass__ = Singleton
364
365 - def __init__(self, dsn=None, schema_check=True, echo=False):
366 """
367 @param dsn: database connection string.
368 @param schema_check: disable or enable the db schema version check.
369 @param echo: echo sql queries.
370 """
371 self._lock = SuperLock()
372 cfg = Config()
373
374 if dsn:
375 self._connect_database(dsn)
376 elif hasattr(cfg, "database") and cfg.database.connection:
377 self._connect_database(cfg.database.connection)
378 else:
379 db_file = os.path.join(CUCKOO_ROOT, "db", "cuckoo.db")
380 if not os.path.exists(db_file):
381 db_dir = os.path.dirname(db_file)
382 if not os.path.exists(db_dir):
383 try:
384 create_folder(folder=db_dir)
385 except CuckooOperationalError as e:
386 raise CuckooDatabaseError("Unable to create database directory: {0}".format(e))
387
388 self._connect_database("sqlite:///%s" % db_file)
389
390
391 self.engine.echo = echo
392
393
394 if hasattr(cfg, "database") and cfg.database.timeout:
395 self.engine.pool_timeout = cfg.database.timeout
396 else:
397 self.engine.pool_timeout = 60
398
399
400 if not hasattr(cfg, "database"):
401 log.warning("It appears you don't have a valid `database` "
402 "section in conf/cuckoo.conf, using sqlite3 instead.")
403
404
405 try:
406 Base.metadata.create_all(self.engine)
407 except SQLAlchemyError as e:
408 raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e))
409
410
411 self.Session = sessionmaker(bind=self.engine)
412
413
414
415 tmp_session = self.Session()
416 if not tmp_session.query(AlembicVersion).count():
417
418 tmp_session.add(AlembicVersion(version_num=SCHEMA_VERSION))
419 try:
420 tmp_session.commit()
421 except SQLAlchemyError as e:
422 raise CuckooDatabaseError("Unable to set schema version: {0}".format(e))
423 tmp_session.rollback()
424 finally:
425 tmp_session.close()
426 else:
427
428 last = tmp_session.query(AlembicVersion).first()
429 tmp_session.close()
430 if last.version_num != SCHEMA_VERSION and schema_check:
431 raise CuckooDatabaseError(
432 "DB schema version mismatch: found %s, expected %s. "
433 "Try to apply all migrations (cd utils/db_migration/ && "
434 "alembic upgrade head)." %
435 (last.version_num, SCHEMA_VERSION)
436 )
437
439 """Disconnects pool."""
440 self.engine.dispose()
441
443 """Connect to a Database.
444 @param connection_string: Connection string specifying the database
445 """
446 try:
447
448 if connection_string.startswith("sqlite"):
449
450 self.engine = create_engine(connection_string, connect_args={"check_same_thread": False})
451 elif connection_string.startswith("postgres"):
452
453
454 self.engine = create_engine(connection_string, connect_args={"sslmode": "disable"})
455 else:
456 self.engine = create_engine(connection_string)
457 except ImportError as e:
458 lib = e.message.split()[-1]
459 raise CuckooDependencyError(
460 "Missing database driver, unable to import %s (install with "
461 "`pip install %s`)" % (lib, lib)
462 )
463
465 """Get an ORM instance or create it if not exist.
466 @param session: SQLAlchemy session object
467 @param model: model to query
468 @return: row instance
469 """
470 instance = session.query(model).filter_by(**kwargs).first()
471 return instance or model(**kwargs)
472
473 @classlock
475 """Drop all tables."""
476 try:
477 Base.metadata.drop_all(self.engine)
478 except SQLAlchemyError as e:
479 raise CuckooDatabaseError("Unable to create or connect to database: {0}".format(e))
480
481 @classlock
483 """Clean old stored machines and related tables."""
484
485
486 self.engine.execute(machines_tags.delete())
487
488 session = self.Session()
489 try:
490 session.query(Machine).delete()
491 session.commit()
492 except SQLAlchemyError as e:
493 log.debug("Database error cleaning machines: {0}".format(e))
494 session.rollback()
495 finally:
496 session.close()
497
498 @classlock
499 - def add_machine(self, name, label, ip, platform, options, tags, interface,
500 snapshot, resultserver_ip, resultserver_port):
501 """Add a guest machine.
502 @param name: machine id
503 @param label: machine label
504 @param ip: machine IP address
505 @param platform: machine supported platform
506 @param tags: list of comma separated tags
507 @param interface: sniffing interface for this machine
508 @param snapshot: snapshot name to use instead of the current one, if configured
509 @param resultserver_ip: IP address of the Result Server
510 @param resultserver_port: port of the Result Server
511 """
512 session = self.Session()
513 machine = Machine(name=name,
514 label=label,
515 ip=ip,
516 platform=platform,
517 options=options,
518 interface=interface,
519 snapshot=snapshot,
520 resultserver_ip=resultserver_ip,
521 resultserver_port=resultserver_port)
522
523
524 if tags:
525 for tag in tags.split(","):
526 if tag.strip():
527 tag = self._get_or_create(session, Tag, name=tag.strip())
528 machine.tags.append(tag)
529 session.add(machine)
530
531 try:
532 session.commit()
533 except SQLAlchemyError as e:
534 log.debug("Database error adding machine: {0}".format(e))
535 session.rollback()
536 finally:
537 session.close()
538
539 @classlock
565
566 @classlock
568 """Set the taken route of this task.
569 @param task_id: task identifier
570 @param route: route string
571 @return: operation status
572 """
573 session = self.Session()
574 try:
575 row = session.query(Task).get(task_id)
576 if not row:
577 return
578
579 row.route = route
580 session.commit()
581 except SQLAlchemyError as e:
582 log.debug("Database error setting route: {0}".format(e))
583 session.rollback()
584 finally:
585 session.close()
586
587 @classlock
588 - def fetch(self, machine=None, service=True):
613
614 @classlock
637
638 @classlock
654
655 @classlock
673
674 @classlock
688
689 @classlock
691 """Logs guest stop.
692 @param guest_id: guest log entry id
693 """
694 session = self.Session()
695 try:
696 guest = session.query(Guest).get(guest_id)
697 guest.status = "stopped"
698 guest.shutdown_on = datetime.now()
699 session.commit()
700 except SQLAlchemyError as e:
701 log.debug("Database error logging guest stop: {0}".format(e))
702 session.rollback()
703 except TypeError:
704 log.warning("Data inconsistency in guests table detected, it might be a crash leftover. Continue")
705 session.rollback()
706 finally:
707 session.close()
708
709 @classlock
726
727 @classlock
728 - def lock_machine(self, label=None, platform=None, tags=None):
785
786 @classlock
814
815 @classlock
817 """How many virtual machines are ready for analysis.
818 @return: free virtual machines count
819 """
820 session = self.Session()
821 try:
822 machines_count = session.query(Machine).filter_by(locked=False).count()
823 return machines_count
824 except SQLAlchemyError as e:
825 log.debug("Database error counting machines: {0}".format(e))
826 return 0
827 finally:
828 session.close()
829
830 @classlock
832 """ Which machines are available
833 @return: free virtual machines
834 """
835 session = self.Session()
836 try:
837 machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=False).all()
838 return machines
839 except SQLAlchemyError as e:
840 log.debug("Database error getting available machines: {0}".format(e))
841 return []
842 finally:
843 session.close()
844
845 @classlock
872
873 @classlock
889
890
891
892 @classlock
893 - def add(self, obj, timeout=0, package="", options="", priority=1,
894 custom="", owner="", machine="", platform="", tags=None,
895 memory=False, enforce_timeout=False, clock=None, category=None):
896 """Add a task to database.
897 @param obj: object to add (File or URL).
898 @param timeout: selected timeout.
899 @param options: analysis options.
900 @param priority: analysis priority.
901 @param custom: custom options.
902 @param owner: task owner.
903 @param machine: selected machine.
904 @param platform: platform.
905 @param tags: optional tags that must be set for machine selection
906 @param memory: toggle full memory dump.
907 @param enforce_timeout: toggle full timeout execution.
908 @param clock: virtual machine clock time
909 @return: cursor or None.
910 """
911 session = self.Session()
912
913
914 if not timeout:
915 timeout = 0
916 if not priority:
917 priority = 1
918
919 if isinstance(obj, File):
920 sample = Sample(md5=obj.get_md5(),
921 crc32=obj.get_crc32(),
922 sha1=obj.get_sha1(),
923 sha256=obj.get_sha256(),
924 sha512=obj.get_sha512(),
925 file_size=obj.get_size(),
926 file_type=obj.get_type(),
927 ssdeep=obj.get_ssdeep())
928 session.add(sample)
929
930 try:
931 session.commit()
932 except IntegrityError:
933 session.rollback()
934 try:
935 sample = session.query(Sample).filter_by(md5=obj.get_md5()).first()
936 except SQLAlchemyError as e:
937 log.debug("Error querying sample for hash: {0}".format(e))
938 session.close()
939 return None
940 except SQLAlchemyError as e:
941 log.debug("Database error adding task: {0}".format(e))
942 session.close()
943 return None
944
945 task = Task(obj.file_path)
946 task.sample_id = sample.id
947 elif isinstance(obj, URL):
948 task = Task(obj.url)
949 else:
950 task = Task("none")
951
952 task.category = category
953 task.timeout = timeout
954 task.package = package
955 task.options = options
956 task.priority = priority
957 task.custom = custom
958 task.owner = owner
959 task.machine = machine
960 task.platform = platform
961 task.memory = memory
962 task.enforce_timeout = enforce_timeout
963
964
965 if tags:
966 for tag in tags.split(","):
967 tag = self._get_or_create(session, Tag, name=tag.strip())
968 task.tags.append(tag)
969
970 if clock:
971 if isinstance(clock, str) or isinstance(clock, unicode):
972 try:
973 task.clock = datetime.strptime(clock, "%m-%d-%Y %H:%M:%S")
974 except ValueError:
975 log.warning("The date you specified has an invalid format, using current timestamp.")
976 task.clock = datetime.now()
977 else:
978 task.clock = clock
979
980 session.add(task)
981
982 try:
983 session.commit()
984 task_id = task.id
985 except SQLAlchemyError as e:
986 log.debug("Database error adding task: {0}".format(e))
987 session.rollback()
988 return None
989 finally:
990 session.close()
991
992 return task_id
993
994 - def add_path(self, file_path, timeout=0, package="", options="",
995 priority=1, custom="", owner="", machine="", platform="",
996 tags=None, memory=False, enforce_timeout=False, clock=None):
997 """Add a task to database from file path.
998 @param file_path: sample path.
999 @param timeout: selected timeout.
1000 @param options: analysis options.
1001 @param priority: analysis priority.
1002 @param custom: custom options.
1003 @param owner: task owner.
1004 @param machine: selected machine.
1005 @param platform: platform.
1006 @param tags: Tags required in machine selection
1007 @param memory: toggle full memory dump.
1008 @param enforce_timeout: toggle full timeout execution.
1009 @param clock: virtual machine clock time
1010 @return: cursor or None.
1011 """
1012 if not file_path or not os.path.exists(file_path):
1013 log.warning("File does not exist: %s.", file_path)
1014 return None
1015
1016
1017 if not timeout:
1018 timeout = 0
1019 if not priority:
1020 priority = 1
1021
1022 return self.add(File(file_path), timeout, package, options, priority,
1023 custom, owner, machine, platform, tags, memory,
1024 enforce_timeout, clock, "file")
1025
1026 - def add_url(self, url, timeout=0, package="", options="", priority=1,
1027 custom="", owner="", machine="", platform="", tags=None,
1028 memory=False, enforce_timeout=False, clock=None):
1029 """Add a task to database from url.
1030 @param url: url.
1031 @param timeout: selected timeout.
1032 @param options: analysis options.
1033 @param priority: analysis priority.
1034 @param custom: custom options.
1035 @param owner: task owner.
1036 @param machine: selected machine.
1037 @param platform: platform.
1038 @param tags: tags for machine selection
1039 @param memory: toggle full memory dump.
1040 @param enforce_timeout: toggle full timeout execution.
1041 @param clock: virtual machine clock time
1042 @return: cursor or None.
1043 """
1044
1045
1046 if not timeout:
1047 timeout = 0
1048 if not priority:
1049 priority = 1
1050
1051 return self.add(URL(url), timeout, package, options, priority,
1052 custom, owner, machine, platform, tags, memory,
1053 enforce_timeout, clock, "url")
1054
1055 - def add_baseline(self, timeout=0, owner="", machine="", memory=False):
1056 """Add a baseline task to database.
1057 @param timeout: selected timeout.
1058 @param owner: task owner.
1059 @param machine: selected machine.
1060 @param memory: toggle full memory dump.
1061 @return: cursor or None.
1062 """
1063 return self.add(None, timeout=timeout or 0, priority=999, owner=owner,
1064 machine=machine, memory=memory, category="baseline")
1065
1067 """Add a service task to database.
1068 @param timeout: selected timeout.
1069 @param owner: task owner.
1070 @param tags: task tags.
1071 @return: cursor or None.
1072 """
1073 return self.add(None, timeout=timeout, priority=999, owner=owner,
1074 tags=tags, category="service")
1075
1076 - def add_reboot(self, task_id, timeout=0, options="", priority=1,
1077 owner="", machine="", platform="", tags=None, memory=False,
1078 enforce_timeout=False, clock=None):
1079 """Add a reboot task to database from an existing analysis.
1080 @param task_id: task id of existing analysis.
1081 @param timeout: selected timeout.
1082 @param options: analysis options.
1083 @param priority: analysis priority.
1084 @param owner: task owner.
1085 @param machine: selected machine.
1086 @param platform: platform.
1087 @param tags: tags for machine selection
1088 @param memory: toggle full memory dump.
1089 @param enforce_timeout: toggle full timeout execution.
1090 @param clock: virtual machine clock time
1091 @return: cursor or None.
1092 """
1093
1094
1095 if not timeout:
1096 timeout = 0
1097 if not priority:
1098 priority = 1
1099
1100 task = self.view_task(task_id)
1101 if not task or not os.path.exists(task.target):
1102 log.error(
1103 "Unable to add reboot analysis as the original task or its "
1104 "sample has already been deleted."
1105 )
1106 return
1107
1108
1109
1110 custom = "%s" % task_id
1111
1112 return self.add(File(task.target), timeout, "reboot", options,
1113 priority, custom, owner, machine, platform, tags,
1114 memory, enforce_timeout, clock, "file")
1115
1116 @classlock
1118 """Reschedule a task.
1119 @param task_id: ID of the task to reschedule.
1120 @return: ID of the newly created task.
1121 """
1122 task = self.view_task(task_id)
1123 if not task:
1124 return
1125
1126 if task.category == "file":
1127 add = self.add_path
1128 elif task.category == "url":
1129 add = self.add_url
1130 else:
1131 return
1132
1133
1134 session = self.Session()
1135 session.query(Task).get(task_id).status = TASK_RECOVERED
1136 try:
1137 session.commit()
1138 except SQLAlchemyError as e:
1139 log.debug("Database error rescheduling task: {0}".format(e))
1140 session.rollback()
1141 return False
1142 finally:
1143 session.close()
1144
1145
1146 if task.tags:
1147 tags = ",".join(tag.name for tag in task.tags)
1148 else:
1149 tags = task.tags
1150
1151
1152 if priority:
1153 task.priority = priority
1154
1155 options = emit_options(task.options)
1156 return add(task.target, task.timeout, task.package, options,
1157 task.priority, task.custom, task.owner, task.machine,
1158 task.platform, tags, task.memory, task.enforce_timeout,
1159 task.clock)
1160
1161 - def list_tasks(self, limit=None, details=True, category=None, owner=None,
1162 offset=None, status=None, sample_id=None, not_status=None,
1163 completed_after=None, order_by=None):
1164 """Retrieve list of task.
1165 @param limit: specify a limit of entries.
1166 @param details: if details about must be included
1167 @param category: filter by category
1168 @param owner: task owner
1169 @param offset: list offset
1170 @param status: filter by task status
1171 @param sample_id: filter tasks for a sample
1172 @param not_status: exclude this task status from filter
1173 @param completed_after: only list tasks completed after this timestamp
1174 @param order_by: definition which field to sort by
1175 @return: list of tasks.
1176 """
1177 session = self.Session()
1178 try:
1179 search = session.query(Task)
1180
1181 if status:
1182 search = search.filter_by(status=status)
1183 if not_status:
1184 search = search.filter(Task.status != not_status)
1185 if category:
1186 search = search.filter_by(category=category)
1187 if owner:
1188 search = search.filter_by(owner=owner)
1189 if details:
1190 search = search.options(joinedload("guest"), joinedload("errors"), joinedload("tags"))
1191 if sample_id is not None:
1192 search = search.filter_by(sample_id=sample_id)
1193 if completed_after:
1194 search = search.filter(Task.completed_on > completed_after)
1195
1196 if order_by is not None:
1197 search = search.order_by(order_by)
1198 else:
1199 search = search.order_by(Task.added_on.desc())
1200
1201 tasks = search.limit(limit).offset(offset).all()
1202 return tasks
1203 except SQLAlchemyError as e:
1204 log.debug("Database error listing tasks: {0}".format(e))
1205 return []
1206 finally:
1207 session.close()
1208
1210 """Find tasks minimum and maximum
1211 @return: unix timestamps of minimum and maximum
1212 """
1213 session = self.Session()
1214 try:
1215 _min = session.query(func.min(Task.started_on).label("min")).first()
1216 _max = session.query(func.max(Task.completed_on).label("max")).first()
1217 return int(_min[0].strftime("%s")), int(_max[0].strftime("%s"))
1218 except SQLAlchemyError as e:
1219 log.debug("Database error counting tasks: {0}".format(e))
1220 return 0
1221 finally:
1222 session.close()
1223
1224 @classlock
1226 """Count tasks in the database
1227 @param status: apply a filter according to the task status
1228 @return: number of tasks found
1229 """
1230 session = self.Session()
1231 try:
1232 if status:
1233 tasks_count = session.query(Task).filter_by(status=status).count()
1234 else:
1235 tasks_count = session.query(Task).count()
1236 return tasks_count
1237 except SQLAlchemyError as e:
1238 log.debug("Database error counting tasks: {0}".format(e))
1239 return 0
1240 finally:
1241 session.close()
1242
1243 @classlock
1244 - def view_task(self, task_id, details=True):
1245 """Retrieve information on a task.
1246 @param task_id: ID of the task to query.
1247 @return: details on the task.
1248 """
1249 session = self.Session()
1250 try:
1251 if details:
1252 task = session.query(Task).options(joinedload("guest"), joinedload("errors"), joinedload("tags")).get(task_id)
1253 else:
1254 task = session.query(Task).get(task_id)
1255 except SQLAlchemyError as e:
1256 log.debug("Database error viewing task: {0}".format(e))
1257 return None
1258 else:
1259 if task:
1260 session.expunge(task)
1261 return task
1262 finally:
1263 session.close()
1264
1265 @classlock
1267 """Delete information on a task.
1268 @param task_id: ID of the task to query.
1269 @return: operation status.
1270 """
1271 session = self.Session()
1272 try:
1273 task = session.query(Task).get(task_id)
1274 session.delete(task)
1275 session.commit()
1276 except SQLAlchemyError as e:
1277 log.debug("Database error deleting task: {0}".format(e))
1278 session.rollback()
1279 return False
1280 finally:
1281 session.close()
1282 return True
1283
1284 @classlock
1286 """Retrieve information on a sample given a sample id.
1287 @param sample_id: ID of the sample to query.
1288 @return: details on the sample used in sample: sample_id.
1289 """
1290 session = self.Session()
1291 try:
1292 sample = session.query(Sample).get(sample_id)
1293 except AttributeError:
1294 return None
1295 except SQLAlchemyError as e:
1296 log.debug("Database error viewing task: {0}".format(e))
1297 return None
1298 else:
1299 if sample:
1300 session.expunge(sample)
1301 finally:
1302 session.close()
1303
1304 return sample
1305
1306 @classlock
1327
1328 @classlock
1330 """Counts the amount of samples in the database."""
1331 session = self.Session()
1332 try:
1333 sample_count = session.query(Sample).count()
1334 except SQLAlchemyError as e:
1335 log.debug("Database error counting samples: {0}".format(e))
1336 return 0
1337 finally:
1338 session.close()
1339 return sample_count
1340
1341 @classlock
1343 """Show virtual machine.
1344 @params name: virtual machine name
1345 @return: virtual machine's details
1346 """
1347 session = self.Session()
1348 try:
1349 machine = session.query(Machine).options(joinedload("tags")).filter_by(name=name).first()
1350 except SQLAlchemyError as e:
1351 log.debug("Database error viewing machine: {0}".format(e))
1352 return None
1353 else:
1354 if machine:
1355 session.expunge(machine)
1356 finally:
1357 session.close()
1358 return machine
1359
1360 @classlock
1362 """Show virtual machine.
1363 @params label: virtual machine label
1364 @return: virtual machine's details
1365 """
1366 session = self.Session()
1367 try:
1368 machine = session.query(Machine).options(joinedload("tags")).filter_by(label=label).first()
1369 except SQLAlchemyError as e:
1370 log.debug("Database error viewing machine by label: {0}".format(e))
1371 return None
1372 else:
1373 if machine:
1374 session.expunge(machine)
1375 finally:
1376 session.close()
1377 return machine
1378
1379 @classlock
1381 """Get all errors related to a task.
1382 @param task_id: ID of task associated to the errors
1383 @return: list of errors.
1384 """
1385 session = self.Session()
1386 try:
1387 errors = session.query(Error).filter_by(task_id=task_id).all()
1388 except SQLAlchemyError as e:
1389 log.debug("Database error viewing errors: {0}".format(e))
1390 return []
1391 finally:
1392 session.close()
1393 return errors
1394
1396 """Get an available task for processing."""
1397 session = self.Session()
1398
1399
1400
1401
1402
1403
1404
1405
1406 query = """
1407 UPDATE tasks SET processing = :instance
1408 WHERE id IN (
1409 SELECT id FROM tasks
1410 WHERE status = :status AND processing IS NULL
1411 ORDER BY priority DESC, id ASC LIMIT 1 FOR UPDATE
1412 )
1413 RETURNING id
1414 """
1415
1416 try:
1417 params = {
1418 "instance": instance,
1419 "status": TASK_COMPLETED,
1420 }
1421 task = session.execute(query, params).first()
1422 session.commit()
1423 return task[0] if task else None
1424 except SQLAlchemyError as e:
1425 log.debug("Database error getting new processing tasks: %s", e)
1426 finally:
1427 session.close()
1428