[apps/tidep0084.git] / example / iot-gateway / node_modules / aws-iot-device-sdk / node_modules / websocket-stream / stream.js
1 var through = require('through2')
2 var duplexify = require('duplexify')
3 var WS = require('ws')
5 module.exports = WebSocketStream
7 function WebSocketStream(target, protocols, options) {
8 var stream, socket
10 var isBrowser = process.title === 'browser'
11 var isNative = !!global.WebSocket
12 var socketWrite = isBrowser ? socketWriteBrowser : socketWriteNode
13 var proxy = through.obj(socketWrite, socketEnd)
15 if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
16 // accept the "options" Object as the 2nd argument
17 options = protocols
18 protocols = null
19 }
21 if (!options) options = {}
23 // browser only: sets the maximum socket buffer size before throttling
24 var bufferSize = options.browserBufferSize || 1024 * 512
26 // browser only: how long to wait when throttling
27 var bufferTimeout = options.browserBufferTimeout || 1000
29 // use existing WebSocket object that was passed in
30 if (typeof target === 'object') {
31 socket = target
32 // otherwise make a new one
33 } else {
34 // special constructor treatment for native websockets in browsers, see
35 // https://github.com/maxogden/websocket-stream/issues/82
36 if (isNative && isBrowser) {
37 socket = new WS(target, protocols)
38 } else {
39 socket = new WS(target, protocols, options)
40 }
42 socket.binaryType = 'arraybuffer'
43 }
45 // was already open when passed in
46 if (socket.readyState === WS.OPEN) {
47 stream = proxy
48 } else {
49 stream = duplexify.obj()
50 socket.onopen = onopen
51 }
53 stream.socket = socket
55 socket.onclose = onclose
56 socket.onerror = onerror
57 socket.onmessage = onmessage
59 proxy.on('close', destroy)
61 function socketWriteNode(chunk, enc, next) {
62 socket.send(chunk, next)
63 }
65 function socketWriteBrowser(chunk, enc, next) {
66 if (socket.bufferedAmount > bufferSize) {
67 setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
68 return
69 }
71 try {
72 socket.send(chunk)
73 } catch(err) {
74 return next(err)
75 }
77 next()
78 }
80 function socketEnd(done) {
81 socket.close()
82 done()
83 }
85 function onopen() {
86 stream.setReadable(proxy)
87 stream.setWritable(proxy)
88 stream.emit('connect')
89 }
91 function onclose() {
92 stream.end()
93 stream.destroy()
94 }
96 function onerror(err) {
97 stream.destroy(err)
98 }
100 function onmessage(event) {
101 var data = event.data
102 if (data instanceof ArrayBuffer) data = new Buffer(new Uint8Array(data))
103 else data = new Buffer(data)
104 proxy.push(data)
105 }
107 function destroy() {
108 socket.close()
109 }
111 return stream
112 }