# **STOMP Over Web Socket** is a JavaScript STOMP Client using # [HTML5 Web Sockets API](http://www.w3.org/TR/websockets). # # * Copyright (C) 2010-2012 [Jeff Mesnil](http://jmesnil.net/) # * Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com) # # This library supports: # # * [STOMP 1.0](http://stomp.github.com/stomp-specification-1.0.html) # * [STOMP 1.1](http://stomp.github.com/stomp-specification-1.1.html) # # The library is accessed through the `Stomp` object that is set on the `window` # when running in a Web browser. ### Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/) Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com) ### # Define constants for bytes used throughout the code. Byte = # LINEFEED byte (octet 10) LF: '\x0A' # NULL byte (octet 0) NULL: '\x00' # ##[STOMP Frame](http://stomp.github.com/stomp-specification-1.1.html#STOMP_Frames) Class class Frame # Frame constructor constructor: (@command, @headers={}, @body='') -> # Provides a textual representation of the frame # suitable to be sent to the server toString: -> lines = [@command] skipContentLength = if (@headers['content-length'] == false) then true else false delete @headers['content-length'] if skipContentLength for own name, value of @headers lines.push("#{name}:#{value}") if @body && !skipContentLength lines.push("content-length:#{Frame.sizeOfUTF8(@body)}") lines.push(Byte.LF + @body) return lines.join(Byte.LF) # Compute the size of a UTF-8 string by counting its number of bytes # (and not the number of characters composing the string) @sizeOfUTF8: (s)-> if s encodeURI(s).match(/%..|./g).length else 0 # Unmarshall a single STOMP frame from a `data` string unmarshallSingle= (data) -> # search for 2 consecutives LF byte to split the command # and headers from the body divider = data.search(///#{Byte.LF}#{Byte.LF}///) headerLines = data.substring(0, divider).split(Byte.LF) command = headerLines.shift() headers = {} # utility function to trim any whitespace before and after a string trim= (str) -> str.replace(/^\s+|\s+$/g,'') # Parse headers in reverse order so that for repeated headers, the 1st # value is used for line in headerLines.reverse() idx = line.indexOf(':') headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)) # Parse body # check for content-length or topping at the first NULL byte found. body = '' # skip the 2 LF bytes that divides the headers from the body start = divider + 2 if headers['content-length'] len = parseInt headers['content-length'] body = ('' + data).substring(start, start + len) else chr = null for i in [start...data.length] chr = data.charAt(i) break if chr is Byte.NULL body += chr return new Frame(command, headers, body) # Split the data before unmarshalling every single STOMP frame. # Web socket servers can send multiple frames in a single websocket message. # If the message size exceeds the websocket message size, then a single # frame can be fragmented across multiple messages. # # `datas` is a string. # # returns an *array* of Frame objects @unmarshall: (datas) -> # Ugly list comprehension to split and unmarshall *multiple STOMP frames* # contained in a *single WebSocket frame*. # The data is split when a NULL byte (followed by zero or many LF bytes) is # found frames = datas.split(///#{Byte.NULL}#{Byte.LF}*///) r = frames: [] partial: '' r.frames = (unmarshallSingle(frame) for frame in frames[0..-2]) # If this contains a final full message or just a acknowledgement of a PING # without any other content, process this frame, otherwise return the # contents of the buffer to the caller. last_frame = frames[-1..][0] if last_frame is Byte.LF or (last_frame.search ///#{Byte.NULL}#{Byte.LF}*$///) isnt -1 r.frames.push(unmarshallSingle(last_frame)) else r.partial = last_frame return r # Marshall a Stomp frame @marshall: (command, headers, body) -> frame = new Frame(command, headers, body) return frame.toString() + Byte.NULL # ##STOMP Client Class # # All STOMP protocol is exposed as methods of this class (`connect()`, # `send()`, etc.) class Client constructor: (@ws) -> @ws.binaryType = "arraybuffer" # used to index subscribers @counter = 0 @connected = false # Heartbeat properties of the client @heartbeat = { # send heartbeat every 10s by default (value is in ms) outgoing: 10000 # expect to receive server heartbeat at least every 10s by default # (value in ms) incoming: 10000 } # maximum *WebSocket* frame size sent by the client. If the STOMP frame # is bigger than this value, the STOMP frame will be sent using multiple # WebSocket frames (default is 16KiB) @maxWebSocketFrameSize = 16*1024 # subscription callbacks indexed by subscriber's ID @subscriptions = {} @partialData = '' # ### Debugging # # By default, debug messages are logged in the window's console if it is defined. # This method is called for every actual transmission of the STOMP frames over the # WebSocket. # # It is possible to set a `debug(message)` method # on a client instance to handle differently the debug messages: # # client.debug = function(str) { # // append the debug log to a #debug div # $("#debug").append(str + "\n"); # }; debug: (message) -> window?.console?.log message # Utility method to get the current timestamp (Date.now is not defined in IE8) now= -> if Date.now then Date.now() else new Date().valueOf # Base method to transmit any stomp frame _transmit: (command, headers, body) -> out = Frame.marshall(command, headers, body) @debug? ">>> " + out # if necessary, split the *STOMP* frame to send it on many smaller # *WebSocket* frames while(true) if out.length > @maxWebSocketFrameSize @ws.send(out.substring(0, @maxWebSocketFrameSize)) out = out.substring(@maxWebSocketFrameSize) @debug? "remaining = " + out.length else return @ws.send(out) # Heart-beat negotiation _setupHeartbeat: (headers) -> return unless headers.version in [Stomp.VERSIONS.V1_1, Stomp.VERSIONS.V1_2] # heart-beat header received from the server looks like: # # heart-beat: sx, sy [serverOutgoing, serverIncoming] = (parseInt(v) for v in headers['heart-beat'].split(",")) unless @heartbeat.outgoing == 0 or serverIncoming == 0 ttl = Math.max(@heartbeat.outgoing, serverIncoming) @debug? "send PING every #{ttl}ms" # The `Stomp.setInterval` is a wrapper to handle regular callback # that depends on the runtime environment (Web browser or node.js app) @pinger = Stomp.setInterval ttl, => @ws.send Byte.LF @debug? ">>> PING" unless @heartbeat.incoming == 0 or serverOutgoing == 0 ttl = Math.max(@heartbeat.incoming, serverOutgoing) @debug? "check PONG every #{ttl}ms" @ponger = Stomp.setInterval ttl, => delta = now() - @serverActivity # We wait twice the TTL to be flexible on window's setInterval calls if delta > ttl * 2 @debug? "did not receive server activity for the last #{delta}ms" @ws.close() # parse the arguments number and type to find the headers, connectCallback and # (eventually undefined) errorCallback _parseConnect: (args...) -> headers = {} switch args.length when 2 [headers, connectCallback] = args when 3 if args[1] instanceof Function [headers, connectCallback, errorCallback] = args else [headers.login, headers.passcode, connectCallback] = args when 4 [headers.login, headers.passcode, connectCallback, errorCallback] = args else [headers.login, headers.passcode, connectCallback, errorCallback, headers.host] = args [headers, connectCallback, errorCallback] # [CONNECT Frame](http://stomp.github.com/stomp-specification-1.1.html#CONNECT_or_STOMP_Frame) # # The `connect` method accepts different number of arguments and types: # # * `connect(headers, connectCallback)` # * `connect(headers, connectCallback, errorCallback)` # * `connect(login, passcode, connectCallback)` # * `connect(login, passcode, connectCallback, errorCallback)` # * `connect(login, passcode, connectCallback, errorCallback, host)` # # The errorCallback is optional and the 2 first forms allow to pass other # headers in addition to `client`, `passcode` and `host`. connect: (args...) -> out = @_parseConnect(args...) [headers, @connectCallback, errorCallback] = out @debug? "Opening Web Socket..." @ws.onmessage = (evt) => data = if typeof(ArrayBuffer) != 'undefined' and evt.data instanceof ArrayBuffer # the data is stored inside an ArrayBuffer, we decode it to get the # data as a String arr = new Uint8Array(evt.data) @debug? "--- got data length: #{arr.length}" # Return a string formed by all the char codes stored in the Uint8array (String.fromCharCode(c) for c in arr).join('') else # take the data directly from the WebSocket `data` field evt.data @serverActivity = now() if data == Byte.LF # heartbeat @debug? "<<< PONG" return @debug? "<<< #{data}" # Handle STOMP frames received from the server # The unmarshall function returns the frames parsed and any remaining # data from partial frames. unmarshalledData = Frame.unmarshall(@partialData + data) @partialData = unmarshalledData.partial for frame in unmarshalledData.frames switch frame.command # [CONNECTED Frame](http://stomp.github.com/stomp-specification-1.1.html#CONNECTED_Frame) when "CONNECTED" @debug? "connected to server #{frame.headers.server}" @connected = true @_setupHeartbeat(frame.headers) @connectCallback? frame # [MESSAGE Frame](http://stomp.github.com/stomp-specification-1.1.html#MESSAGE) when "MESSAGE" # the `onreceive` callback is registered when the client calls # `subscribe()`. # If there is registered subscription for the received message, # we used the default `onreceive` method that the client can set. # This is useful for subscriptions that are automatically created # on the browser side (e.g. [RabbitMQ's temporary # queues](http://www.rabbitmq.com/stomp.html)). subscription = frame.headers.subscription onreceive = @subscriptions[subscription] or @onreceive if onreceive client = this messageID = frame.headers["message-id"] # add `ack()` and `nack()` methods directly to the returned frame # so that a simple call to `message.ack()` can acknowledge the message. frame.ack = (headers = {}) => client .ack messageID , subscription, headers frame.nack = (headers = {}) => client .nack messageID, subscription, headers onreceive frame else @debug? "Unhandled received MESSAGE: #{frame}" # [RECEIPT Frame](http://stomp.github.com/stomp-specification-1.1.html#RECEIPT) # # The client instance can set its `onreceipt` field to a function taking # a frame argument that will be called when a receipt is received from # the server: # # client.onreceipt = function(frame) { # receiptID = frame.headers['receipt-id']; # ... # } when "RECEIPT" @onreceipt?(frame) # [ERROR Frame](http://stomp.github.com/stomp-specification-1.1.html#ERROR) when "ERROR" errorCallback?(frame) else @debug? "Unhandled frame: #{frame}" @ws.onclose = => msg = "Whoops! Lost connection to #{@ws.url}" @debug?(msg) @_cleanUp() errorCallback?(msg) @ws.onopen = => @debug?('Web Socket Opened...') headers["accept-version"] = Stomp.VERSIONS.supportedVersions() headers["heart-beat"] = [@heartbeat.outgoing, @heartbeat.incoming].join(',') @_transmit "CONNECT", headers # [DISCONNECT Frame](http://stomp.github.com/stomp-specification-1.1.html#DISCONNECT) disconnect: (disconnectCallback, headers={}) -> @_transmit "DISCONNECT", headers # Discard the onclose callback to avoid calling the errorCallback when # the client is properly disconnected. @ws.onclose = null @ws.close() @_cleanUp() disconnectCallback?() # Clean up client resources when it is disconnected or the server did not # send heart beats in a timely fashion _cleanUp: () -> @connected = false Stomp.clearInterval @pinger if @pinger Stomp.clearInterval @ponger if @ponger # [SEND Frame](http://stomp.github.com/stomp-specification-1.1.html#SEND) # # * `destination` is MANDATORY. send: (destination, headers={}, body='') -> headers.destination = destination @_transmit "SEND", headers, body # [SUBSCRIBE Frame](http://stomp.github.com/stomp-specification-1.1.html#SUBSCRIBE) subscribe: (destination, callback, headers={}) -> # for convenience if the `id` header is not set, we create a new one for this client # that will be returned to be able to unsubscribe this subscription unless headers.id headers.id = "sub-" + @counter++ headers.destination = destination @subscriptions[headers.id] = callback @_transmit "SUBSCRIBE", headers client = this return { id: headers.id unsubscribe: -> client.unsubscribe headers.id } # [UNSUBSCRIBE Frame](http://stomp.github.com/stomp-specification-1.1.html#UNSUBSCRIBE) # # * `id` is MANDATORY. # # It is preferable to unsubscribe from a subscription by calling # `unsubscribe()` directly on the object returned by `client.subscribe()`: # # var subscription = client.subscribe(destination, onmessage); # ... # subscription.unsubscribe(); unsubscribe: (id) -> delete @subscriptions[id] @_transmit "UNSUBSCRIBE", { id: id } # [BEGIN Frame](http://stomp.github.com/stomp-specification-1.1.html#BEGIN) # # If no transaction ID is passed, one will be created automatically begin: (transaction) -> txid = transaction || "tx-" + @counter++ @_transmit "BEGIN", { transaction: txid } client = this return { id: txid commit: -> client.commit txid abort: -> client.abort txid } # [COMMIT Frame](http://stomp.github.com/stomp-specification-1.1.html#COMMIT) # # * `transaction` is MANDATORY. # # It is preferable to commit a transaction by calling `commit()` directly on # the object returned by `client.begin()`: # # var tx = client.begin(txid); # ... # tx.commit(); commit: (transaction) -> @_transmit "COMMIT", { transaction: transaction } # [ABORT Frame](http://stomp.github.com/stomp-specification-1.1.html#ABORT) # # * `transaction` is MANDATORY. # # It is preferable to abort a transaction by calling `abort()` directly on # the object returned by `client.begin()`: # # var tx = client.begin(txid); # ... # tx.abort(); abort: (transaction) -> @_transmit "ABORT", { transaction: transaction } # [ACK Frame](http://stomp.github.com/stomp-specification-1.1.html#ACK) # # * `messageID` & `subscription` are MANDATORY. # # It is preferable to acknowledge a message by calling `ack()` directly # on the message handled by a subscription callback: # # client.subscribe(destination, # function(message) { # // process the message # // acknowledge it # message.ack(); # }, # {'ack': 'client'} # ); ack: (messageID, subscription, headers = {}) -> headers["message-id"] = messageID headers.subscription = subscription @_transmit "ACK", headers # [NACK Frame](http://stomp.github.com/stomp-specification-1.1.html#NACK) # # * `messageID` & `subscription` are MANDATORY. # # It is preferable to nack a message by calling `nack()` directly on the # message handled by a subscription callback: # # client.subscribe(destination, # function(message) { # // process the message # // an error occurs, nack it # message.nack(); # }, # {'ack': 'client'} # ); nack: (messageID, subscription, headers = {}) -> headers["message-id"] = messageID headers.subscription = subscription @_transmit "NACK", headers # ##The `Stomp` Object Stomp = VERSIONS: V1_0: '1.0' V1_1: '1.1' V1_2: '1.2' # Versions of STOMP specifications supported supportedVersions: -> '1.1,1.0' # This method creates a WebSocket client that is connected to # the STOMP server located at the url. client: (url, protocols = ['v10.stomp', 'v11.stomp']) -> # This is a hack to allow another implementation than the standard # HTML5 WebSocket class. # # It is possible to use another class by calling # # Stomp.WebSocketClass = MozWebSocket # # *prior* to call `Stomp.client()`. # # This hack is deprecated and `Stomp.over()` method should be used # instead. klass = Stomp.WebSocketClass || WebSocket ws = new klass(url, protocols) new Client ws # This method is an alternative to `Stomp.client()` to let the user # specify the WebSocket to use (either a standard HTML5 WebSocket or # a similar object). over: (ws) -> new Client ws # For testing purpose, expose the Frame class inside Stomp to be able to # marshall/unmarshall frames Frame: Frame # # `Stomp` object exportation # export as CommonJS module if exports? exports.Stomp = Stomp # export in the Web Browser if window? # in the Web browser, rely on `window.setInterval` to handle heart-beats Stomp.setInterval= (interval, f) -> window.setInterval f, interval Stomp.clearInterval= (id) -> window.clearInterval id window.Stomp = Stomp # or in the current object (e.g. a WebWorker) else if !exports self.Stomp = Stomp