1
2
3
4
5 import os
6 import socket
7 import select
8 import logging
9 import datetime
10 import SocketServer
11 from threading import Event, Thread
12
13 from lib.cuckoo.common.config import Config
14 from lib.cuckoo.common.constants import CUCKOO_ROOT
15 from lib.cuckoo.common.exceptions import CuckooOperationalError
16 from lib.cuckoo.common.exceptions import CuckooCriticalError
17 from lib.cuckoo.common.exceptions import CuckooResultError
18 from lib.cuckoo.common.netlog import NetlogParser, BsonParser
19 from lib.cuckoo.common.utils import create_folder, Singleton, logtime
20
21 log = logging.getLogger(__name__)
22
23 BUFSIZE = 16 * 1024
24 EXTENSIONS = {
25 NetlogParser: ".raw",
26 BsonParser: ".bson",
27 }
28
31
32
34 """Result server. Singleton!
35
36 This class handles results coming back from the analysis machines.
37 """
38
39 __metaclass__ = Singleton
40
41 allow_reuse_address = True
42 daemon_threads = True
43
45 self.cfg = Config()
46 self.analysistasks = {}
47 self.analysishandlers = {}
48
49 try:
50 server_addr = self.cfg.resultserver.ip, self.cfg.resultserver.port
51 SocketServer.ThreadingTCPServer.__init__(self,
52 server_addr,
53 Resulthandler,
54 *args,
55 **kwargs)
56 except Exception as e:
57 raise CuckooCriticalError("Unable to bind result server on "
58 "{0}:{1}: {2}".format(
59 self.cfg.resultserver.ip,
60 self.cfg.resultserver.port, str(e)))
61 else:
62 self.servethread = Thread(target=self.serve_forever)
63 self.servethread.setDaemon(True)
64 self.servethread.start()
65
67 """Register a task/machine with the Resultserver."""
68 self.analysistasks[machine.ip] = (task, machine)
69 self.analysishandlers[task.id] = []
70
72 """Delete Resultserver state and wait for pending RequestHandlers."""
73 x = self.analysistasks.pop(machine.ip, None)
74 if not x:
75 log.warning("Resultserver did not have {0} in its task "
76 "info.".format(machine.ip))
77 handlers = self.analysishandlers.pop(task.id, None)
78 for h in handlers:
79 h.end_request.set()
80 h.done_event.wait()
81
83 """Register a RequestHandler so that we can later wait for it."""
84 task, machine = self.get_ctx_for_ip(handler.client_address[0])
85 if not task or not machine:
86 return False
87 self.analysishandlers[task.id].append(handler)
88
90 """Return state for this IP's task."""
91 x = self.analysistasks.get(ip, None)
92 if not x:
93 log.critical("Resultserver unable to map ip to "
94 "context: {0}.".format(ip))
95 return None, None
96
97 return x
98
100 """Initialize analysis storage folder."""
101 task, machine = self.get_ctx_for_ip(ip)
102 if not task or not machine:
103 return False
104
105 storagepath = os.path.join(CUCKOO_ROOT, "storage",
106 "analyses", str(task.id))
107 return storagepath
108
109
111 """Result handler.
112
113 This handler speaks our analysis log network protocol.
114 """
115
117 self.logfd = None
118 self.rawlogfd = None
119 self.protocol = None
120 self.startbuf = ""
121 self.end_request = Event()
122 self.done_event = Event()
123 self.pid, self.ppid, self.procname = (None, None, None)
124 self.server.register_handler(self)
125
127 self.done_event.set()
128
130 while True:
131 if self.end_request.isSet():
132 return False
133 rs, ws, xs = select.select([self.request], [], [], 1)
134 if rs:
135 return True
136
137 - def read(self, length):
138 buf = ""
139 while len(buf) < length:
140 if not self.wait_sock_or_end():
141 raise Disconnect()
142 tmp = self.request.recv(length-len(buf))
143 if not tmp:
144 raise Disconnect()
145 buf += tmp
146
147 if isinstance(self.protocol, (NetlogParser, BsonParser)):
148 if self.rawlogfd:
149 self.rawlogfd.write(buf)
150 else:
151 self.startbuf += buf
152 return buf
153
161
163 buf = ""
164 while not "\n" in buf:
165 buf += self.read(1)
166 return buf
167
183
185 ip, port = self.client_address
186 self.connect_time = datetime.datetime.now()
187 log.debug("New connection from: {0}:{1}".format(ip, port))
188
189 self.storagepath = self.server.build_storage_path(ip)
190 if not self.storagepath:
191 return
192
193
194 self.create_folders()
195
196 try:
197
198 self.negotiate_protocol()
199
200 while True:
201 r = self.protocol.read_next_message()
202 if not r:
203 break
204 except CuckooResultError as e:
205 log.warning("Resultserver connection stopping because of "
206 "CuckooResultError: %s.", str(e))
207 except Disconnect:
208 pass
209 except socket.error, e:
210 log.debug("socket.error: {0}".format(e))
211 except:
212 log.exception("FIXME - exception in resultserver connection %s",
213 str(self.client_address))
214
215 try:
216 self.protocol.close()
217 except:
218 pass
219
220 if self.logfd:
221 self.logfd.close()
222 if self.rawlogfd:
223 self.rawlogfd.close()
224 log.debug("Connection closed: {0}:{1}".format(ip, port))
225
226 - def log_process(self, ctx, timestring, pid, ppid, modulepath, procname):
227 if not self.pid is None:
228 log.debug("Resultserver got a new process message but already "
229 "has pid %d ppid %s procname %s",
230 pid, str(ppid), procname)
231 raise CuckooResultError("Resultserver connection state "
232 "incosistent.")
233
234 log.debug("New process (pid={0}, ppid={1}, name={2}, "
235 "path={3})".format(pid, ppid, procname, modulepath))
236
237
238 if self.server.cfg.resultserver.store_csvs:
239 self.logfd = open(os.path.join(self.storagepath, "logs",
240 str(pid) + ".csv"), "wb")
241
242
243 ext = EXTENSIONS.get(type(self.protocol), ".raw")
244 self.rawlogfd = open(os.path.join(self.storagepath, "logs",
245 str(pid) + ext), "wb")
246 self.rawlogfd.write(self.startbuf)
247
248 self.pid, self.ppid, self.procname = pid, ppid, procname
249
251 log.debug("New thread (tid={0}, pid={1})".format(context[3], pid))
252
253 - def log_call(self, context, apiname, modulename, arguments):
254 if not self.rawlogfd:
255 raise CuckooOperationalError("Netlog failure, call "
256 "before process.")
257
258 apiindex, status, returnval, tid, timediff = context
259
260
261
262 current_time = self.connect_time + datetime.timedelta(0, 0,
263 timediff*1000)
264 timestring = logtime(current_time)
265
266 argumentstrings = ["{0}->{1}".format(argname, repr(str(r))[1:-1])
267 for argname, r in arguments]
268
269 if self.logfd:
270 print >>self.logfd, ",".join("\"{0}\"".format(i) for i in [
271 timestring, self.pid, self.procname, tid, self.ppid,
272 modulename, apiname, status, returnval] + argumentstrings)
273
275 log.warning("Resultserver error condition on connection %s "
276 "(pid %s procname %s): %s", str(self.client_address),
277 str(self.pid), str(self.procname), emsg)
278
280 folders = ["shots", "files", "logs"]
281
282 for folder in folders:
283 try:
284 create_folder(self.storagepath, folder=folder)
285 except CuckooOperationalError:
286 log.error("Unable to create folder %s" % folder)
287 return False
288
289
292 self.handler = handler
293 self.upload_max_size = \
294 self.handler.server.cfg.resultserver.upload_max_size
295 self.storagepath = self.handler.storagepath
296
298
299
300
301 buf = self.handler.read_newline().strip().replace("\\", "/")
302 log.debug("File upload request for {0}".format(buf))
303
304 if "../" in buf:
305 raise CuckooOperationalError("FileUpload failure, banned path.")
306
307 dir_part, filename = os.path.split(buf)
308
309 if dir_part:
310 try:
311 create_folder(self.storagepath, dir_part)
312 except CuckooOperationalError:
313 log.error("Unable to create folder %s" % dir_part)
314 return False
315
316 file_path = os.path.join(self.storagepath, buf.strip())
317
318 fd = open(file_path, "wb")
319 chunk = self.handler.read_any()
320 while chunk:
321 fd.write(chunk)
322
323 if fd.tell() >= self.upload_max_size:
324 fd.write("... (truncated)")
325 break
326
327 chunk = self.handler.read_any()
328
329 log.debug("Uploaded file length: {0}".format(fd.tell()))
330 fd.close()
331
332
335 self.handler = handler
336 self.logpath = os.path.join(handler.storagepath, "analysis.log")
337 self.fd = self._open()
338 log.debug("LogHandler for live analysis.log initialized.")
339
341 buf = self.handler.read_newline()
342 if not buf:
343 return False
344 self.fd.write(buf)
345 self.fd.flush()
346 return True
347
350
352 if os.path.exists(self.logpath):
353 return open(self.logpath, "ab")
354 return open(self.logpath, "wb")
355