receiver.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const FastBuffer = Buffer[Symbol.species];
  13. const promise = Promise.resolve();
  14. //
  15. // `queueMicrotask()` is not available in Node.js < 11.
  16. //
  17. const queueTask =
  18. typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;
  19. const GET_INFO = 0;
  20. const GET_PAYLOAD_LENGTH_16 = 1;
  21. const GET_PAYLOAD_LENGTH_64 = 2;
  22. const GET_MASK = 3;
  23. const GET_DATA = 4;
  24. const INFLATING = 5;
  25. const DEFER_EVENT = 6;
  26. /**
  27. * HyBi Receiver implementation.
  28. *
  29. * @extends Writable
  30. */
  31. class Receiver extends Writable {
  32. /**
  33. * Creates a Receiver instance.
  34. *
  35. * @param {Object} [options] Options object
  36. * @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
  37. * any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
  38. * multiple times in the same tick
  39. * @param {String} [options.binaryType=nodebuffer] The type for binary data
  40. * @param {Object} [options.extensions] An object containing the negotiated
  41. * extensions
  42. * @param {Boolean} [options.isServer=false] Specifies whether to operate in
  43. * client or server mode
  44. * @param {Number} [options.maxPayload=0] The maximum allowed message length
  45. * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
  46. * not to skip UTF-8 validation for text and close messages
  47. */
  48. constructor(options = {}) {
  49. super();
  50. this._allowSynchronousEvents = !!options.allowSynchronousEvents;
  51. this._binaryType = options.binaryType || BINARY_TYPES[0];
  52. this._extensions = options.extensions || {};
  53. this._isServer = !!options.isServer;
  54. this._maxPayload = options.maxPayload | 0;
  55. this._skipUTF8Validation = !!options.skipUTF8Validation;
  56. this[kWebSocket] = undefined;
  57. this._bufferedBytes = 0;
  58. this._buffers = [];
  59. this._compressed = false;
  60. this._payloadLength = 0;
  61. this._mask = undefined;
  62. this._fragmented = 0;
  63. this._masked = false;
  64. this._fin = false;
  65. this._opcode = 0;
  66. this._totalPayloadLength = 0;
  67. this._messageLength = 0;
  68. this._fragments = [];
  69. this._errored = false;
  70. this._loop = false;
  71. this._state = GET_INFO;
  72. }
  73. /**
  74. * Implements `Writable.prototype._write()`.
  75. *
  76. * @param {Buffer} chunk The chunk of data to write
  77. * @param {String} encoding The character encoding of `chunk`
  78. * @param {Function} cb Callback
  79. * @private
  80. */
  81. _write(chunk, encoding, cb) {
  82. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  83. this._bufferedBytes += chunk.length;
  84. this._buffers.push(chunk);
  85. this.startLoop(cb);
  86. }
  87. /**
  88. * Consumes `n` bytes from the buffered data.
  89. *
  90. * @param {Number} n The number of bytes to consume
  91. * @return {Buffer} The consumed bytes
  92. * @private
  93. */
  94. consume(n) {
  95. this._bufferedBytes -= n;
  96. if (n === this._buffers[0].length) return this._buffers.shift();
  97. if (n < this._buffers[0].length) {
  98. const buf = this._buffers[0];
  99. this._buffers[0] = new FastBuffer(
  100. buf.buffer,
  101. buf.byteOffset + n,
  102. buf.length - n
  103. );
  104. return new FastBuffer(buf.buffer, buf.byteOffset, n);
  105. }
  106. const dst = Buffer.allocUnsafe(n);
  107. do {
  108. const buf = this._buffers[0];
  109. const offset = dst.length - n;
  110. if (n >= buf.length) {
  111. dst.set(this._buffers.shift(), offset);
  112. } else {
  113. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  114. this._buffers[0] = new FastBuffer(
  115. buf.buffer,
  116. buf.byteOffset + n,
  117. buf.length - n
  118. );
  119. }
  120. n -= buf.length;
  121. } while (n > 0);
  122. return dst;
  123. }
  124. /**
  125. * Starts the parsing loop.
  126. *
  127. * @param {Function} cb Callback
  128. * @private
  129. */
  130. startLoop(cb) {
  131. this._loop = true;
  132. do {
  133. switch (this._state) {
  134. case GET_INFO:
  135. this.getInfo(cb);
  136. break;
  137. case GET_PAYLOAD_LENGTH_16:
  138. this.getPayloadLength16(cb);
  139. break;
  140. case GET_PAYLOAD_LENGTH_64:
  141. this.getPayloadLength64(cb);
  142. break;
  143. case GET_MASK:
  144. this.getMask();
  145. break;
  146. case GET_DATA:
  147. this.getData(cb);
  148. break;
  149. case INFLATING:
  150. case DEFER_EVENT:
  151. this._loop = false;
  152. return;
  153. }
  154. } while (this._loop);
  155. if (!this._errored) cb();
  156. }
  157. /**
  158. * Reads the first two bytes of a frame.
  159. *
  160. * @param {Function} cb Callback
  161. * @private
  162. */
  163. getInfo(cb) {
  164. if (this._bufferedBytes < 2) {
  165. this._loop = false;
  166. return;
  167. }
  168. const buf = this.consume(2);
  169. if ((buf[0] & 0x30) !== 0x00) {
  170. const error = this.createError(
  171. RangeError,
  172. 'RSV2 and RSV3 must be clear',
  173. true,
  174. 1002,
  175. 'WS_ERR_UNEXPECTED_RSV_2_3'
  176. );
  177. cb(error);
  178. return;
  179. }
  180. const compressed = (buf[0] & 0x40) === 0x40;
  181. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  182. const error = this.createError(
  183. RangeError,
  184. 'RSV1 must be clear',
  185. true,
  186. 1002,
  187. 'WS_ERR_UNEXPECTED_RSV_1'
  188. );
  189. cb(error);
  190. return;
  191. }
  192. this._fin = (buf[0] & 0x80) === 0x80;
  193. this._opcode = buf[0] & 0x0f;
  194. this._payloadLength = buf[1] & 0x7f;
  195. if (this._opcode === 0x00) {
  196. if (compressed) {
  197. const error = this.createError(
  198. RangeError,
  199. 'RSV1 must be clear',
  200. true,
  201. 1002,
  202. 'WS_ERR_UNEXPECTED_RSV_1'
  203. );
  204. cb(error);
  205. return;
  206. }
  207. if (!this._fragmented) {
  208. const error = this.createError(
  209. RangeError,
  210. 'invalid opcode 0',
  211. true,
  212. 1002,
  213. 'WS_ERR_INVALID_OPCODE'
  214. );
  215. cb(error);
  216. return;
  217. }
  218. this._opcode = this._fragmented;
  219. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  220. if (this._fragmented) {
  221. const error = this.createError(
  222. RangeError,
  223. `invalid opcode ${this._opcode}`,
  224. true,
  225. 1002,
  226. 'WS_ERR_INVALID_OPCODE'
  227. );
  228. cb(error);
  229. return;
  230. }
  231. this._compressed = compressed;
  232. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  233. if (!this._fin) {
  234. const error = this.createError(
  235. RangeError,
  236. 'FIN must be set',
  237. true,
  238. 1002,
  239. 'WS_ERR_EXPECTED_FIN'
  240. );
  241. cb(error);
  242. return;
  243. }
  244. if (compressed) {
  245. const error = this.createError(
  246. RangeError,
  247. 'RSV1 must be clear',
  248. true,
  249. 1002,
  250. 'WS_ERR_UNEXPECTED_RSV_1'
  251. );
  252. cb(error);
  253. return;
  254. }
  255. if (
  256. this._payloadLength > 0x7d ||
  257. (this._opcode === 0x08 && this._payloadLength === 1)
  258. ) {
  259. const error = this.createError(
  260. RangeError,
  261. `invalid payload length ${this._payloadLength}`,
  262. true,
  263. 1002,
  264. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  265. );
  266. cb(error);
  267. return;
  268. }
  269. } else {
  270. const error = this.createError(
  271. RangeError,
  272. `invalid opcode ${this._opcode}`,
  273. true,
  274. 1002,
  275. 'WS_ERR_INVALID_OPCODE'
  276. );
  277. cb(error);
  278. return;
  279. }
  280. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  281. this._masked = (buf[1] & 0x80) === 0x80;
  282. if (this._isServer) {
  283. if (!this._masked) {
  284. const error = this.createError(
  285. RangeError,
  286. 'MASK must be set',
  287. true,
  288. 1002,
  289. 'WS_ERR_EXPECTED_MASK'
  290. );
  291. cb(error);
  292. return;
  293. }
  294. } else if (this._masked) {
  295. const error = this.createError(
  296. RangeError,
  297. 'MASK must be clear',
  298. true,
  299. 1002,
  300. 'WS_ERR_UNEXPECTED_MASK'
  301. );
  302. cb(error);
  303. return;
  304. }
  305. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  306. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  307. else this.haveLength(cb);
  308. }
  309. /**
  310. * Gets extended payload length (7+16).
  311. *
  312. * @param {Function} cb Callback
  313. * @private
  314. */
  315. getPayloadLength16(cb) {
  316. if (this._bufferedBytes < 2) {
  317. this._loop = false;
  318. return;
  319. }
  320. this._payloadLength = this.consume(2).readUInt16BE(0);
  321. this.haveLength(cb);
  322. }
  323. /**
  324. * Gets extended payload length (7+64).
  325. *
  326. * @param {Function} cb Callback
  327. * @private
  328. */
  329. getPayloadLength64(cb) {
  330. if (this._bufferedBytes < 8) {
  331. this._loop = false;
  332. return;
  333. }
  334. const buf = this.consume(8);
  335. const num = buf.readUInt32BE(0);
  336. //
  337. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  338. // if payload length is greater than this number.
  339. //
  340. if (num > Math.pow(2, 53 - 32) - 1) {
  341. const error = this.createError(
  342. RangeError,
  343. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  344. false,
  345. 1009,
  346. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  347. );
  348. cb(error);
  349. return;
  350. }
  351. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  352. this.haveLength(cb);
  353. }
  354. /**
  355. * Payload length has been read.
  356. *
  357. * @param {Function} cb Callback
  358. * @private
  359. */
  360. haveLength(cb) {
  361. if (this._payloadLength && this._opcode < 0x08) {
  362. this._totalPayloadLength += this._payloadLength;
  363. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  364. const error = this.createError(
  365. RangeError,
  366. 'Max payload size exceeded',
  367. false,
  368. 1009,
  369. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  370. );
  371. cb(error);
  372. return;
  373. }
  374. }
  375. if (this._masked) this._state = GET_MASK;
  376. else this._state = GET_DATA;
  377. }
  378. /**
  379. * Reads mask bytes.
  380. *
  381. * @private
  382. */
  383. getMask() {
  384. if (this._bufferedBytes < 4) {
  385. this._loop = false;
  386. return;
  387. }
  388. this._mask = this.consume(4);
  389. this._state = GET_DATA;
  390. }
  391. /**
  392. * Reads data bytes.
  393. *
  394. * @param {Function} cb Callback
  395. * @private
  396. */
  397. getData(cb) {
  398. let data = EMPTY_BUFFER;
  399. if (this._payloadLength) {
  400. if (this._bufferedBytes < this._payloadLength) {
  401. this._loop = false;
  402. return;
  403. }
  404. data = this.consume(this._payloadLength);
  405. if (
  406. this._masked &&
  407. (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
  408. ) {
  409. unmask(data, this._mask);
  410. }
  411. }
  412. if (this._opcode > 0x07) {
  413. this.controlMessage(data, cb);
  414. return;
  415. }
  416. if (this._compressed) {
  417. this._state = INFLATING;
  418. this.decompress(data, cb);
  419. return;
  420. }
  421. if (data.length) {
  422. //
  423. // This message is not compressed so its length is the sum of the payload
  424. // length of all fragments.
  425. //
  426. this._messageLength = this._totalPayloadLength;
  427. this._fragments.push(data);
  428. }
  429. this.dataMessage(cb);
  430. }
  431. /**
  432. * Decompresses data.
  433. *
  434. * @param {Buffer} data Compressed data
  435. * @param {Function} cb Callback
  436. * @private
  437. */
  438. decompress(data, cb) {
  439. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  440. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  441. if (err) return cb(err);
  442. if (buf.length) {
  443. this._messageLength += buf.length;
  444. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  445. const error = this.createError(
  446. RangeError,
  447. 'Max payload size exceeded',
  448. false,
  449. 1009,
  450. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  451. );
  452. cb(error);
  453. return;
  454. }
  455. this._fragments.push(buf);
  456. }
  457. this.dataMessage(cb);
  458. if (this._state === GET_INFO) this.startLoop(cb);
  459. });
  460. }
  461. /**
  462. * Handles a data message.
  463. *
  464. * @param {Function} cb Callback
  465. * @private
  466. */
  467. dataMessage(cb) {
  468. if (!this._fin) {
  469. this._state = GET_INFO;
  470. return;
  471. }
  472. const messageLength = this._messageLength;
  473. const fragments = this._fragments;
  474. this._totalPayloadLength = 0;
  475. this._messageLength = 0;
  476. this._fragmented = 0;
  477. this._fragments = [];
  478. if (this._opcode === 2) {
  479. let data;
  480. if (this._binaryType === 'nodebuffer') {
  481. data = concat(fragments, messageLength);
  482. } else if (this._binaryType === 'arraybuffer') {
  483. data = toArrayBuffer(concat(fragments, messageLength));
  484. } else {
  485. data = fragments;
  486. }
  487. //
  488. // If the state is `INFLATING`, it means that the frame data was
  489. // decompressed asynchronously, so there is no need to defer the event
  490. // as it will be emitted asynchronously anyway.
  491. //
  492. if (this._state === INFLATING || this._allowSynchronousEvents) {
  493. this.emit('message', data, true);
  494. this._state = GET_INFO;
  495. } else {
  496. this._state = DEFER_EVENT;
  497. queueTask(() => {
  498. this.emit('message', data, true);
  499. this._state = GET_INFO;
  500. this.startLoop(cb);
  501. });
  502. }
  503. } else {
  504. const buf = concat(fragments, messageLength);
  505. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  506. const error = this.createError(
  507. Error,
  508. 'invalid UTF-8 sequence',
  509. true,
  510. 1007,
  511. 'WS_ERR_INVALID_UTF8'
  512. );
  513. cb(error);
  514. return;
  515. }
  516. if (this._state === INFLATING || this._allowSynchronousEvents) {
  517. this.emit('message', buf, false);
  518. this._state = GET_INFO;
  519. } else {
  520. this._state = DEFER_EVENT;
  521. queueTask(() => {
  522. this.emit('message', buf, false);
  523. this._state = GET_INFO;
  524. this.startLoop(cb);
  525. });
  526. }
  527. }
  528. }
  529. /**
  530. * Handles a control message.
  531. *
  532. * @param {Buffer} data Data to handle
  533. * @return {(Error|RangeError|undefined)} A possible error
  534. * @private
  535. */
  536. controlMessage(data, cb) {
  537. if (this._opcode === 0x08) {
  538. if (data.length === 0) {
  539. this._loop = false;
  540. this.emit('conclude', 1005, EMPTY_BUFFER);
  541. this.end();
  542. } else {
  543. const code = data.readUInt16BE(0);
  544. if (!isValidStatusCode(code)) {
  545. const error = this.createError(
  546. RangeError,
  547. `invalid status code ${code}`,
  548. true,
  549. 1002,
  550. 'WS_ERR_INVALID_CLOSE_CODE'
  551. );
  552. cb(error);
  553. return;
  554. }
  555. const buf = new FastBuffer(
  556. data.buffer,
  557. data.byteOffset + 2,
  558. data.length - 2
  559. );
  560. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  561. const error = this.createError(
  562. Error,
  563. 'invalid UTF-8 sequence',
  564. true,
  565. 1007,
  566. 'WS_ERR_INVALID_UTF8'
  567. );
  568. cb(error);
  569. return;
  570. }
  571. this._loop = false;
  572. this.emit('conclude', code, buf);
  573. this.end();
  574. }
  575. this._state = GET_INFO;
  576. return;
  577. }
  578. if (this._allowSynchronousEvents) {
  579. this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
  580. this._state = GET_INFO;
  581. } else {
  582. this._state = DEFER_EVENT;
  583. queueTask(() => {
  584. this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
  585. this._state = GET_INFO;
  586. this.startLoop(cb);
  587. });
  588. }
  589. }
  590. /**
  591. * Builds an error object.
  592. *
  593. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  594. * @param {String} message The error message
  595. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  596. * `message`
  597. * @param {Number} statusCode The status code
  598. * @param {String} errorCode The exposed error code
  599. * @return {(Error|RangeError)} The error
  600. * @private
  601. */
  602. createError(ErrorCtor, message, prefix, statusCode, errorCode) {
  603. this._loop = false;
  604. this._errored = true;
  605. const err = new ErrorCtor(
  606. prefix ? `Invalid WebSocket frame: ${message}` : message
  607. );
  608. Error.captureStackTrace(err, this.createError);
  609. err.code = errorCode;
  610. err[kStatusCode] = statusCode;
  611. return err;
  612. }
  613. }
  614. module.exports = Receiver;
  615. /**
  616. * A shim for `queueMicrotask()`.
  617. *
  618. * @param {Function} cb Callback
  619. */
  620. function queueMicrotaskShim(cb) {
  621. promise.then(cb).catch(throwErrorNextTick);
  622. }
  623. /**
  624. * Throws an error.
  625. *
  626. * @param {Error} err The error to throw
  627. * @private
  628. */
  629. function throwError(err) {
  630. throw err;
  631. }
  632. /**
  633. * Throws an error in the next tick.
  634. *
  635. * @param {Error} err The error to throw
  636. * @private
  637. */
  638. function throwErrorNextTick(err) {
  639. process.nextTick(throwError, err);
  640. }