[apps/tidep0084.git] / example / iot-gateway / node_modules / aws-iot-device-sdk / node_modules / mqtt / node_modules / mqtt-connection / connection.js
2 var generateStream = require('./lib/generateStream')
3 , parseStream = require('./lib/parseStream')
4 , Reduplexer = require('reduplexer')
5 , inherits = require('inherits')
6 , setImmediate = global.setImmediate
8 setImmediate = setImmediate || function(func) {
9 process.nextTick(func);
10 }
12 function emitPacket(packet) {
13 this.emit(packet.cmd, packet)
14 }
16 function Connection(duplex, opts) {
17 if (!(this instanceof Connection)) {
18 return new Connection(duplex, opts)
19 }
21 opts = opts || {}
23 var inStream = generateStream()
24 , outStream = parseStream(opts)
26 duplex.pipe(outStream)
27 inStream.pipe(duplex)
29 this.stream = duplex
31 duplex.on('error', this.emit.bind(this, 'error'))
32 duplex.on('close', this.emit.bind(this, 'close'))
34 Reduplexer.call(this, inStream, outStream, { objectMode: true })
36 // MQTT.js basic default
37 if (opts.notData !== true)
38 this.on('data', emitPacket)
39 }
41 inherits(Connection, Reduplexer)
43 ;['connect',
44 'connack',
45 'publish',
46 'puback',
47 'pubrec',
48 'pubrel',
49 'pubcomp',
50 'subscribe',
51 'suback',
52 'unsubscribe',
53 'unsuback',
54 'pingreq',
55 'pingresp',
56 'disconnect'].forEach(function(cmd) {
57 Connection.prototype[cmd] = function(opts, cb) {
58 opts = opts || {}
59 opts.cmd = cmd;
61 // flush the buffer if needed
62 // UGLY hack, we should listen for the 'drain' event
63 // and start writing again, but this works too
64 this.write(opts)
65 if (cb)
66 setImmediate(cb)
67 }
68 });
70 Connection.prototype.destroy = function() {
71 if (this.stream.destroy)
72 this.stream.destroy()
73 else
74 this.stream.end()
75 }
77 module.exports = Connection
78 module.exports.parseStream = parseStream
79 module.exports.generateStream = generateStream