Package lib :: Module hpfeeds
[hide private]
[frames] | no frames]

Source Code for Module lib.hpfeeds

  1   
  2  import struct 
  3  import socket 
  4  import hashlib 
  5  import logging 
  6  from time import sleep 
  7   
  8  logger = logging.getLogger('pyhpfeeds') 
  9   
 10  OP_ERROR        = 0 
 11  OP_INFO         = 1 
 12  OP_AUTH         = 2 
 13  OP_PUBLISH      = 3 
 14  OP_SUBSCRIBE    = 4 
 15  BUFSIZ = 16384 
 16   
 17  __all__ = ["new", "FeedException"] 
 18   
19 -def msghdr(op, data):
20 return struct.pack('!iB', 5+len(data), op) + data
21 -def msgpublish(ident, chan, data):
22 # if isinstance(data, str): 23 # data = data.encode('latin1') 24 return msghdr(OP_PUBLISH, struct.pack('!B', len(ident)) + ident + struct.pack('!B', len(chan)) + chan + data)
25 -def msgsubscribe(ident, chan):
26 return msghdr(OP_SUBSCRIBE, struct.pack('!B', len(ident)) + ident + chan)
27 -def msgauth(rand, ident, secret):
28 hash = hashlib.sha1(rand+secret).digest() 29 return msghdr(OP_AUTH, struct.pack('!B', len(ident)) + ident + hash)
30
31 -class FeedUnpack(object):
32 - def __init__(self):
33 self.buf = bytearray()
34 - def __iter__(self):
35 return self
36 - def next(self):
37 return self.unpack()
38 - def feed(self, data):
39 self.buf.extend(data)
40 - def unpack(self):
41 if len(self.buf) < 5: 42 raise StopIteration('No message.') 43 44 ml, opcode = struct.unpack('!iB', buffer(self.buf,0,5)) 45 if len(self.buf) < ml: 46 raise StopIteration('No message.') 47 48 data = bytearray(buffer(self.buf, 5, ml-5)) 49 del self.buf[:ml] 50 return opcode, data
51
52 -class FeedException(Exception):
53 pass
54
55 -class HPC(object):
56 - def __init__(self, host, port, ident, secret, timeout=3, reconnect=False, sleepwait=20):
57 self.host, self.port = host, port 58 self.ident, self.secret = ident, secret 59 self.timeout = timeout 60 self.reconnect = reconnect 61 self.sleepwait = sleepwait 62 self.brokername = 'unknown' 63 self.connected = False 64 self.stopped = False 65 self.unpacker = FeedUnpack() 66 67 self.connect()
68
69 - def connect(self):
70 logger.info('connecting to {0}:{1}'.format(self.host, self.port)) 71 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 72 self.s.settimeout(self.timeout) 73 try: self.s.connect((self.host, self.port)) 74 except: raise FeedException('Could not connect to broker.') 75 self.connected = True 76 77 try: d = self.s.recv(BUFSIZ) 78 except socket.timeout: raise FeedException('Connection receive timeout.') 79 80 self.unpacker.feed(d) 81 for opcode, data in self.unpacker: 82 if opcode == OP_INFO: 83 rest = buffer(data, 0) 84 name, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) 85 rand = str(rest) 86 87 logger.debug('info message name: {0}, rand: {1}'.format(name, repr(rand))) 88 self.brokername = name 89 90 self.s.send(msgauth(rand, self.ident, self.secret)) 91 break 92 else: 93 raise FeedException('Expected info message at this point.') 94 95 self.s.settimeout(None)
96
97 - def _run(self, message_callback, error_callback):
98 while not self.stopped: 99 while self.connected: 100 d = self.s.recv(BUFSIZ) 101 if not d: 102 self.connected = False 103 break 104 105 self.unpacker.feed(d) 106 for opcode, data in self.unpacker: 107 if opcode == OP_PUBLISH: 108 rest = buffer(data, 0) 109 ident, rest = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) 110 chan, content = rest[1:1+ord(rest[0])], buffer(rest, 1+ord(rest[0])) 111 112 message_callback(str(ident), str(chan), content) 113 elif opcode == OP_ERROR: 114 error_callback(data) 115 116 if self.stopped: break 117 118 if self.stopped: break 119 self.connect()
120
121 - def run(self, message_callback, error_callback):
122 if not self.reconnect: 123 self._run(message_callback, error_callback) 124 else: 125 while True: 126 self._run(message_callback, error_callback) 127 # reconnect now we've failed 128 sleep(self.sleepwait) 129 while True: 130 try: 131 self.connect() 132 break 133 except FeedException: 134 sleep(self.sleepwait)
135
136 - def subscribe(self, chaninfo):
137 if type(chaninfo) == str: 138 chaninfo = [chaninfo,] 139 for c in chaninfo: 140 self.s.send(msgsubscribe(self.ident, c))
141 - def publish(self, chaninfo, data):
142 if type(chaninfo) == str: 143 chaninfo = [chaninfo,] 144 for c in chaninfo: 145 self.s.send(msgpublish(self.ident, c, data))
146
147 - def stop(self):
148 self.stopped = True
149
150 - def close(self):
151 try: self.s.close() 152 except: logger.warn('Socket exception when closing.')
153
154 -def new(host=None, port=10000, ident=None, secret=None, reconnect=True, sleepwait=20):
155 return HPC(host, port, ident, secret, reconnect)
156