/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /* Version 0.1 : R Jewson (rjewson at gmail dot com). First release, only for reciept of messages. Version 0.2s : Sheer Pullen (jonathanpullen at gmail dot com). Binary data transmission and reception Version 0.3s : Sheer Pullen (jonathanpullen at gmail dot com). Support for multiple subscription channels and support for ASCII message receiption and support for queue Version 0.05 : Sheer Pullen - massive improvements to input packet handler version 0.55 : Sheer Pullen - attmept to add legible comments to the project Version 0.56 : Sheer Pullen - Support for session disconnect */ package com.sheer.stomp { import flash.display.*; import flash.events.Event; import flash.events.IOErrorEvent; import flash.events.ProgressEvent; import flash.events.SecurityErrorEvent; import flash.events.TimerEvent; import flash.net.Socket; import flash.system.Security; import flash.utils.ByteArray; import flash.utils.Timer; public class STOMPClient extends Sprite { // we could extend anything eventish.. sprite is just what the original code used static public const SHORT_INT:int = 1000; static public const MEDIUM_INT:int = 5000; static public const LONG_INT:int = 10000; public var ioTimerWait:Number = 50; public var ioThreshold:Number = 8; private var socket:Socket = new Socket(); // This socket is what we will be communicating over private var server:String; private var port:int; private var _stompLogin:String = new String; private var _stompPassword:String = new String; public var autoReconnect:Boolean = true; private var defaultQueue:String; private var stompMsg:STOMPMsg = new STOMPMsg; private var socketConnected:Boolean = false; private var protocolPending:Boolean = false; private var protocolConnected:Boolean = false; private var expectDisconnect:Boolean = false; private var expectData:Boolean = false; private var binary:Boolean = false; private var inputState:Number = 0; private var headerIndex:Number = 0; private var firstTimeAllStarted:Boolean = false; public var useIOTimer:Boolean = false; public var sessionID:String; public var setInterval:int = 0; private var subscriptions:Array = new Array(); private var outputBuffer:ByteArray = new ByteArray; private var objectBuffer:ByteArray = new ByteArray; private var inputBuffer:ByteArray = new ByteArray; private var textBuffer:String = new String(); private var objectBytesToRead:Number = 0; private var timer:Timer; private var subTimer:Timer; // timer for subscription tick - when fires, channels that aren't subscribed yet get subscribed private var connectTimer:Timer; private var ioTimer:Timer; // part of new (r1300) timer based outgoing traffic private var ioCount:int; public var connectTime:Date = null; public var disconnectTime:Date = null; public var connectAttempts:int = 0; public var errorMessages:Array = new Array(); public var headers:Array = new Array(); public function STOMPClient( ) { /* We set up event listeners for the various events that we expect to get from the socket */ socket.addEventListener( flash.events.Event.CONNECT, onConnect ); socket.addEventListener( flash.events.Event.CLOSE, onClose ); socket.addEventListener( flash.events.ProgressEvent.SOCKET_DATA, onData); socket.addEventListener( IOErrorEvent.IO_ERROR, onIOError ); socket.addEventListener( SecurityErrorEvent.SECURITY_ERROR, onSecurityError ); /* ResetIncoming prepares the data structures for incoming data on the socket */ ResetIncoming(); } public function setTimer( interval:int ):void { /* legacy code from codehaus */ if (setInterval==interval) return; if (timer!=null) timer.stop(); timer = new Timer(interval); timer.addEventListener(TimerEvent.TIMER, onSubTick); timer.start(); setInterval=interval; } public function onSubTick( event:TimerEvent ):void { processSubscriptionList(); } public function flush():void { if(ioTimer) ioTimer.stop(); ioTimer = null; ioCount = 0; socket.flush(); } public function ioFlush():void { if(useIOTimer) { ioCount++; if(!ioTimer) { ioTimer = new Timer(ioTimerWait,1); ioTimer.addEventListener(TimerEvent.TIMER,ioTimerTick); ioTimer.start(); } else if(ioCount > ioThreshold) { ioTimerTick(null); } } else { if(socket.connected) socket.flush(); } } public function ioTimerTick(e:TimerEvent):void { if(socket.connected) socket.flush(); ioTimer = null; ioCount = 0; } public function doConnectTimer( event:TimerEvent ):void { doConnect(); } public function connect( _server:String, _port:int, _login:String = null, _password:String = null ):Boolean { if(protocolConnected) return true; // should we return false, since you can't connect? seems silly if(socket.connected) disconnect(); server = mockServer; port = _port; _stompLogin = _login; _stompPassword = _password; doConnect(); return true; } public function disconnect():Boolean { trace('doDisconnect'); useIOTimer = false; SendString("DISCONNECT\n\n\u0000"); // todo: Add a timer here - we shouldn't hang up on the other end until they've had a // chance to hang up on us first expectDisconnect = true; if(socket.connected) socket.close(); onClose(null); // the close event doesn't fire if you close the socket yourself // but we still want the cleanup to happen so we fire it manually return true; } private function doConnect():void { if (socketConnected==true) return; Security.loadPolicyFile('xmlsocket://' + server + ':61614'); // this connects to a perl scriptlet // that returns a valid policyfile var EventObj:STOMPMsg = new STOMPMsg; try { socket.connect( server, port ); // this is where the connection actually happens } catch (e:Object) { trace('STOMP Connection - exception:' + e); } socketConnected = false; protocolConnected = false; protocolPending = true; expectDisconnect = false; // tracer.trace('doConnect() done'); dispatchEvent(new STOMPEvent(STOMPEvent.ConnectEvent,EventObj)); } private function onConnect( event:Event ):void { trace('onConnect'); if (connectTimer!=null) { connectTimer.stop(); } SendString("CONNECT\nlogn: " + _stompLogin + "\npasscode: " + _stompPassword + "\n\n\n\u0000"); socketConnected = true; } private function SendString( str:String ):void { // this function sends a UTF string down the socket. // I split this off into a seperate function in case we needed to change the behavior or the encoding // scheme later trace("SendString to socket: " + str ); socket.writeUTFBytes(str); ioFlush(); // this class member cleanly handles flushing or not as appropriate //socket.flush(); // it's neccesary to flush the socket after every transmission or the data will // linger in the outgoing buffer until it reaches the MTU } private function onClose( event:Event ):void { // this function gets called when the socket is closed // it gets fired either as a event when the socket is closed by the other end // or manually by our disconnect if we close it from our end //trace('onClose'); socketConnected = false; protocolConnected = false; protocolPending = false; disconnectTime = new Date(); // this loop sets all the subscriptions unsubscribed for (var i:int = 0; i < subscriptions.length; i++) { subscriptions[i].subscribed = false; } // If we didn't expect a disconnect, and we're set to autoreconnect, // we try to connect again if ((expectDisconnect==false)&&(autoReconnect==true)) { connectTimer = new Timer(MEDIUM_INT); connectTimer.addEventListener(TimerEvent.TIMER, doConnectTimer); connectTimer.start(); } else { subscriptions = new Array(); } // send a event notifying of the disconnected state var EventObj:STOMPMsg = new STOMPMsg; dispatchEvent(new STOMPEvent(STOMPEvent.DisconnectEvent,EventObj)); } private function onIOError( event:IOErrorEvent ):void { // this function gets called if a IO error occurs // generally this is a failure to connect // but it can also be caused by enough dropped packets in-session trace('onIOError'); var now:Date = new Date(); if (!socket.connected) { socketConnected = false; protocolConnected = false; protocolPending = false; disconnectTime = now; } for (var i:int = 0; i < subscriptions.length; i++) { subscriptions[i].onIOError( event ); } errorMessages.push(now+" "+event.text); // legacy code that I don't use // but am not removing because I'm not sure // if anything depends on it dispatchEvent(new STOMPEvent(STOMPEvent.ioErrorEvent,null)); } private function onSecurityError( event:SecurityErrorEvent ):void { trace('onSecurityError'); var now:Date = new Date(); if (!socket.connected) { socketConnected = false; protocolConnected = false; protocolPending = false; disconnectTime = now; } for (var i:int = 0; i < subscriptions.length; i++) { subscriptions[i].onSecurityError( event ); } errorMessages.push(now+" "+event.text); // legacy code that I don't use // but am not removing because I'm not sure // if anything depends on it dispatchEvent(new STOMPEvent(STOMPEvent.securityErrorEvent,null)); } public function subscribe( sub:STOMPSubscription ):Boolean { var i:int; for(i=0;i 0) { ProcessHeader(); // this processes the header block, getting things like content-length if it's a bytes message break; // this may change the input state } } if(headers[headerIndex].length > 0) { // if we've gotten here, then we need to create a new array element to accept // more incoming header information headerIndex++; headers[headerIndex] = new String(); } // end of if length > 0 } else { // if byte is not a newline headers[headerIndex] += b; //we add it to the array slice that is the current header line // uncomment this if you're debugging header reception // tracer.trace('headers - index: ' + headerIndex + ' header: ' + headers[headerIndex]); } /* if anyone knows of a better way to do this I'd love to hear it */ break; case 1: // BEGIN getting binary data if(inputBuffer.bytesAvailable > objectBytesToRead) // if we have more data in the input buffer than the size // of the next message readLength = objectBytesToRead; // then we only want to read the size of the next message else // otherwise readLength = inputBuffer.bytesAvailable; // we want to read all the data we have // uncomment these if trying to debug receiving binary data // tracer.traceMe('bef - readLength: ' + readLength + ' ob pos:' + objectBuffer.position + 'ob len:' + objectBuffer.length); // tracer.traceMe("ob-write " + inputBuffer.position + " " + readLength + " " + inputBuffer.length); objectBuffer.writeBytes(inputBuffer,inputBuffer.position,readLength); // write the incoming binary data // into the object buffer inputBuffer.position = inputBuffer.position + readLength; // update the input buffer position objectBytesToRead -= readLength; // update the counter tracking number of bytes in the current // bytes message left to be read if(objectBytesToRead == 0) { // if we have read all the bytes in the current message inputState = 4; // set the state to getting tail // tracer.traceMe('started dispatching'); // send message to interested parties about message having been received DispatchMessage(false); // false because binary data ResetIncoming(); // reset variables to prepare for another incoming message // tracer.traceMe('done dispatching'); } else { // tracer.traceMe("objBytesToRead:" + objectBytesToRead); } break; case 2: // BEGIN getting text data b = inputBuffer.readUTFBytes(1); // just like the headers we read the text data // one line at a time // uncomment if debugging reception of text messages // tracer.trace('text: ' + b); if(!b) { // if we get a null, it is the end of the frame //tracer.trace('text dispatch'); DispatchMessage(true); // true because text data ResetIncoming(); // reset variables to prepare for another incoming message inputState = 0; // set input state back to beginning break; } else { // we didn't get a null textBuffer = textBuffer.concat(b); // add byte to text buffer } break; case 4: // BEGIN getting tail // the 'tail' is comprised of \r, \n, \u0000 in some combination b = inputBuffer.readUTFBytes(1); // we read the tail a byte at a time if(b && (b != '\n')) // if the byte is neither null nor \n { // tracer.traceMe('done getting tail!:'+b + ' pos: ' + inputBuffer.position + ' len:' + inputBuffer.length); inputBuffer.position = inputBuffer.position - 1; // we put the byte back on the stack // by changing the input buffer position inputState = 0; // we return to the beginning of the process to handle the next message break; } break; // tracer.traceMe("end of fetch obj"); } // END switch statement } // END while loop //tracer.trace('end - ba: ' + inputBuffer.bytesAvailable + ' state: ' + inputState + ' len:' + inputBuffer.length + 'pos: ' + inputBuffer.position); // when we get here, we're done processing the input buffer and resetting it to a new object // makes the old object get reaped thus preventing memory leaks inputBuffer = new ByteArray(); } // END function private function ResetIncoming():void { // this function gets called before each incoming message (i.e. at the beginning and then once // after each message is received) to prepare variables for a new message to be received headers = new Array(); headers[0] = new String(); headerIndex = 0; objectBytesToRead = 0; objectBuffer = new ByteArray(); textBuffer = new String(); } private function DispatchMessage(bText:Boolean):void { // this sends a event to anyone listening about received messages var i:Number; var Found:Boolean; if(bText) { stompMsg.messageText = textBuffer; stompMsg.messageObject = null; } else { objectBuffer.position = 0; stompMsg.messageBytes = objectBuffer; stompMsg.messageBytes.position = 0; } // as of v0.03 or so, we send the event to the subscription for the channel that // the event message was received on for(i=0;i