Package lib :: Package cuckoo :: Package core :: Module resultserver
[hide private]
[frames] | no frames]

Source Code for Module lib.cuckoo.core.resultserver

  1  # Copyright (C) 2010-2014 Cuckoo Foundation. 
  2  # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org 
  3  # See the file 'docs/LICENSE' for copying permission. 
  4   
  5  import os 
  6  import socket 
  7  import select 
  8  import logging 
  9  import datetime 
 10  import SocketServer 
 11  from threading import Event, Thread 
 12   
 13  from lib.cuckoo.common.config import Config 
 14  from lib.cuckoo.common.constants import CUCKOO_ROOT 
 15  from lib.cuckoo.common.exceptions import CuckooOperationalError 
 16  from lib.cuckoo.common.exceptions import CuckooCriticalError 
 17  from lib.cuckoo.common.exceptions import CuckooResultError 
 18  from lib.cuckoo.common.netlog import NetlogParser, BsonParser 
 19  from lib.cuckoo.common.utils import create_folder, Singleton, logtime 
 20   
 21  log = logging.getLogger(__name__) 
 22   
 23  BUFSIZE = 16 * 1024 
 24  EXTENSIONS = { 
 25      NetlogParser: ".raw", 
 26      BsonParser: ".bson", 
 27  } 
 28   
29 -class Disconnect(Exception):
30 pass
31 32
33 -class Resultserver(SocketServer.ThreadingTCPServer, object):
34 """Result server. Singleton! 35 36 This class handles results coming back from the analysis machines. 37 """ 38 39 __metaclass__ = Singleton 40 41 allow_reuse_address = True 42 daemon_threads = True 43
44 - def __init__(self, *args, **kwargs):
45 self.cfg = Config() 46 self.analysistasks = {} 47 self.analysishandlers = {} 48 49 try: 50 server_addr = self.cfg.resultserver.ip, self.cfg.resultserver.port 51 SocketServer.ThreadingTCPServer.__init__(self, 52 server_addr, 53 Resulthandler, 54 *args, 55 **kwargs) 56 except Exception as e: 57 raise CuckooCriticalError("Unable to bind result server on " 58 "{0}:{1}: {2}".format( 59 self.cfg.resultserver.ip, 60 self.cfg.resultserver.port, str(e))) 61 else: 62 self.servethread = Thread(target=self.serve_forever) 63 self.servethread.setDaemon(True) 64 self.servethread.start()
65
66 - def add_task(self, task, machine):
67 """Register a task/machine with the Resultserver.""" 68 self.analysistasks[machine.ip] = (task, machine) 69 self.analysishandlers[task.id] = []
70
71 - def del_task(self, task, machine):
72 """Delete Resultserver state and wait for pending RequestHandlers.""" 73 x = self.analysistasks.pop(machine.ip, None) 74 if not x: 75 log.warning("Resultserver did not have {0} in its task " 76 "info.".format(machine.ip)) 77 handlers = self.analysishandlers.pop(task.id, None) 78 for h in handlers: 79 h.end_request.set() 80 h.done_event.wait()
81
82 - def register_handler(self, handler):
83 """Register a RequestHandler so that we can later wait for it.""" 84 task, machine = self.get_ctx_for_ip(handler.client_address[0]) 85 if not task or not machine: 86 return False 87 self.analysishandlers[task.id].append(handler)
88
89 - def get_ctx_for_ip(self, ip):
90 """Return state for this IP's task.""" 91 x = self.analysistasks.get(ip, None) 92 if not x: 93 log.critical("Resultserver unable to map ip to " 94 "context: {0}.".format(ip)) 95 return None, None 96 97 return x
98
99 - def build_storage_path(self, ip):
100 """Initialize analysis storage folder.""" 101 task, machine = self.get_ctx_for_ip(ip) 102 if not task or not machine: 103 return False 104 105 storagepath = os.path.join(CUCKOO_ROOT, "storage", 106 "analyses", str(task.id)) 107 return storagepath
108 109
110 -class Resulthandler(SocketServer.BaseRequestHandler):
111 """Result handler. 112 113 This handler speaks our analysis log network protocol. 114 """ 115
116 - def setup(self):
117 self.logfd = None 118 self.rawlogfd = None 119 self.protocol = None 120 self.startbuf = "" 121 self.end_request = Event() 122 self.done_event = Event() 123 self.pid, self.ppid, self.procname = (None, None, None) 124 self.server.register_handler(self)
125
126 - def finish(self):
127 self.done_event.set()
128
129 - def wait_sock_or_end(self):
130 while True: 131 if self.end_request.isSet(): 132 return False 133 rs, ws, xs = select.select([self.request], [], [], 1) 134 if rs: 135 return True
136
137 - def read(self, length):
138 buf = "" 139 while len(buf) < length: 140 if not self.wait_sock_or_end(): 141 raise Disconnect() 142 tmp = self.request.recv(length-len(buf)) 143 if not tmp: 144 raise Disconnect() 145 buf += tmp 146 147 if isinstance(self.protocol, (NetlogParser, BsonParser)): 148 if self.rawlogfd: 149 self.rawlogfd.write(buf) 150 else: 151 self.startbuf += buf 152 return buf
153
154 - def read_any(self):
155 if not self.wait_sock_or_end(): 156 raise Disconnect() 157 tmp = self.request.recv(BUFSIZE) 158 if not tmp: 159 raise Disconnect() 160 return tmp
161
162 - def read_newline(self):
163 buf = "" 164 while not "\n" in buf: 165 buf += self.read(1) 166 return buf
167
168 - def negotiate_protocol(self):
169 # read until newline 170 buf = self.read_newline() 171 172 if "NETLOG" in buf: 173 self.protocol = NetlogParser(self) 174 elif "BSON" in buf: 175 self.protocol = BsonParser(self) 176 elif "FILE" in buf: 177 self.protocol = FileUpload(self) 178 elif "LOG" in buf: 179 self.protocol = LogHandler(self) 180 else: 181 raise CuckooOperationalError("Netlog failure, unknown " 182 "protocol requested.")
183
184 - def handle(self):
185 ip, port = self.client_address 186 self.connect_time = datetime.datetime.now() 187 log.debug("New connection from: {0}:{1}".format(ip, port)) 188 189 self.storagepath = self.server.build_storage_path(ip) 190 if not self.storagepath: 191 return 192 193 # create all missing folders for this analysis 194 self.create_folders() 195 196 try: 197 # initialize the protocol handler class for this connection 198 self.negotiate_protocol() 199 200 while True: 201 r = self.protocol.read_next_message() 202 if not r: 203 break 204 except CuckooResultError as e: 205 log.warning("Resultserver connection stopping because of " 206 "CuckooResultError: %s.", str(e)) 207 except Disconnect: 208 pass 209 except socket.error, e: 210 log.debug("socket.error: {0}".format(e)) 211 except: 212 log.exception("FIXME - exception in resultserver connection %s", 213 str(self.client_address)) 214 215 try: 216 self.protocol.close() 217 except: 218 pass 219 220 if self.logfd: 221 self.logfd.close() 222 if self.rawlogfd: 223 self.rawlogfd.close() 224 log.debug("Connection closed: {0}:{1}".format(ip, port))
225
226 - def log_process(self, ctx, timestring, pid, ppid, modulepath, procname):
227 if not self.pid is None: 228 log.debug("Resultserver got a new process message but already " 229 "has pid %d ppid %s procname %s", 230 pid, str(ppid), procname) 231 raise CuckooResultError("Resultserver connection state " 232 "incosistent.") 233 234 log.debug("New process (pid={0}, ppid={1}, name={2}, " 235 "path={3})".format(pid, ppid, procname, modulepath)) 236 237 # CSV format files are optional 238 if self.server.cfg.resultserver.store_csvs: 239 self.logfd = open(os.path.join(self.storagepath, "logs", 240 str(pid) + ".csv"), "wb") 241 242 # Raw Bson or Netlog extension 243 ext = EXTENSIONS.get(type(self.protocol), ".raw") 244 self.rawlogfd = open(os.path.join(self.storagepath, "logs", 245 str(pid) + ext), "wb") 246 self.rawlogfd.write(self.startbuf) 247 248 self.pid, self.ppid, self.procname = pid, ppid, procname
249
250 - def log_thread(self, context, pid):
251 log.debug("New thread (tid={0}, pid={1})".format(context[3], pid))
252
253 - def log_call(self, context, apiname, modulename, arguments):
254 if not self.rawlogfd: 255 raise CuckooOperationalError("Netlog failure, call " 256 "before process.") 257 258 apiindex, status, returnval, tid, timediff = context 259 260 #log.debug("log_call> tid:{0} apiname:{1}".format(tid, apiname)) 261 262 current_time = self.connect_time + datetime.timedelta(0, 0, 263 timediff*1000) 264 timestring = logtime(current_time) 265 266 argumentstrings = ["{0}->{1}".format(argname, repr(str(r))[1:-1]) 267 for argname, r in arguments] 268 269 if self.logfd: 270 print >>self.logfd, ",".join("\"{0}\"".format(i) for i in [ 271 timestring, self.pid, self.procname, tid, self.ppid, 272 modulename, apiname, status, returnval] + argumentstrings)
273
274 - def log_error(self, emsg):
275 log.warning("Resultserver error condition on connection %s " 276 "(pid %s procname %s): %s", str(self.client_address), 277 str(self.pid), str(self.procname), emsg)
278
279 - def create_folders(self):
280 folders = ["shots", "files", "logs"] 281 282 for folder in folders: 283 try: 284 create_folder(self.storagepath, folder=folder) 285 except CuckooOperationalError: 286 log.error("Unable to create folder %s" % folder) 287 return False
288 289
290 -class FileUpload(object):
291 - def __init__(self, handler):
292 self.handler = handler 293 self.upload_max_size = \ 294 self.handler.server.cfg.resultserver.upload_max_size 295 self.storagepath = self.handler.storagepath
296
297 - def read_next_message(self):
298 # read until newline for file path 299 # e.g. shots/0001.jpg or files/9498687557/libcurl-4.dll.bin 300 301 buf = self.handler.read_newline().strip().replace("\\", "/") 302 log.debug("File upload request for {0}".format(buf)) 303 304 if "../" in buf: 305 raise CuckooOperationalError("FileUpload failure, banned path.") 306 307 dir_part, filename = os.path.split(buf) 308 309 if dir_part: 310 try: 311 create_folder(self.storagepath, dir_part) 312 except CuckooOperationalError: 313 log.error("Unable to create folder %s" % dir_part) 314 return False 315 316 file_path = os.path.join(self.storagepath, buf.strip()) 317 318 fd = open(file_path, "wb") 319 chunk = self.handler.read_any() 320 while chunk: 321 fd.write(chunk) 322 323 if fd.tell() >= self.upload_max_size: 324 fd.write("... (truncated)") 325 break 326 327 chunk = self.handler.read_any() 328 329 log.debug("Uploaded file length: {0}".format(fd.tell())) 330 fd.close()
331 332
333 -class LogHandler(object):
334 - def __init__(self, handler):
335 self.handler = handler 336 self.logpath = os.path.join(handler.storagepath, "analysis.log") 337 self.fd = self._open() 338 log.debug("LogHandler for live analysis.log initialized.")
339
340 - def read_next_message(self):
341 buf = self.handler.read_newline() 342 if not buf: 343 return False 344 self.fd.write(buf) 345 self.fd.flush() 346 return True
347
348 - def close(self):
349 self.fd.close()
350
351 - def _open(self):
352 if os.path.exists(self.logpath): 353 return open(self.logpath, "ab") 354 return open(self.logpath, "wb")
355