import socket # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # s.connect(('10.10.2.245', 554)) # s.send('OPTIONS rtsp://10.10.2.245:554/av0_1 RTSP/1.0\r\n'.encode('ascii')) # s.send('CSeq: 1\r\n'.encode('ascii')) # s.send('Require: implicit-play\r\n'.encode('ascii')) # s.send('Prox-Require: gzipped-messages\r\n\r\n'.encode('ascii')) # data = s.recv(1024) # print(data.decode('ascii')) # s.close() from urllib.parse import urlparse class Rtsp: def __init__(self, url, buffer_size=128*1024, chunk_size=1024, encoding='ascii'): self.raw_url = url self.parsed_url = urlparse(url) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.cseq = 0 self.crlf = b'\r\n' self.endmark = self.crlf + self.crlf self.endmark_size = len(self.endmark) self.buffer_size = buffer_size self.chunk_size = chunk_size self.buffer = bytearray(self.buffer_size) self.encoding = encoding self.buffer_appending_index = 0 self.buffer_searching_index = 0 self.buffer_previous_index = 0 def connect(self): self.socket.connect((self.parsed_url.hostname, self.parsed_url.port)) def send(self, data): print(data) self.socket.send(data) def sendline(self, *args): text = args[0] if len(args) > 1: text %= args[1:] self.send(text.encode(self.encoding) + self.crlf) def sendend(self): self.send(self.crlf) self.send(self.crlf) def sendcseq(self): self.cseq += 1 self.sendline('CSeq: %d', self.cseq) return self.cseq def recv_resp(self): while True: chunk = self.socket.recv(self.chunk_size) if not chunk: return l = len(chunk) i = self.buffer_appending_index j = i + l if j > self.buffer_size: raise BufferError() self.buffer[i:j] = chunk start = i if i == 0 else i - self.endmark_size endmark_index = self.buffer.find(self.endmark, start, j) if ~endmark_index: frame = self.buffer[0:endmark_index] # cleanup buffer next_index = endmark_index + self.endmark_size rest_size = j - next_index self.buffer[0:rest_size] = self.buffer[next_index:j] self.buffer_appending_index = rest_size return frame else: self.buffer_appending_index = j def close(self): self.socket.close() def options(self): self.sendline('OPTIONS %s RTSP/1.0', self.raw_url) self.sendcseq() self.sendline('Require: implicit-play') self.sendend() resp = self.recv_resp() print(resp.decode(self.encoding)) def describe(self): self.sendline('DESCRIBE %s RTSP/1.0', self.raw_url) self.sendcseq() self.sendend() resp = self.recv_resp() print(resp.decode(self.encoding)) rtsp = Rtsp('rtsp://10.10.2.245:554/av0_1',chunk_size=10) rtsp.connect() rtsp.options() # rtsp.describe() rtsp.close() print(rtsp.buffer_appending_index)