×

Discussion Board

Results 1 to 9 of 9
  1. #1
    Regular Contributor
    Join Date
    Dec 2012
    Location
    Indonesia,DKI Jakarta
    Posts
    87

    Question ArrayBlockingQueue

    Hello All,

    I have a problem with j2me ADT. Because there is ADT concurrent ArrayBlockingQueue. I saw from the smack openfire source in PacketWriter class user ArrayBlockingQueue to queue every data that will be sent. But it use j2se. So, How to replace/conver the ArrayBlockingQueue in J2SE into J2ME...?. Here is the PacketWriter class on :

    Code:
    /**
     * $RCSfile$
     * $Revision: 11613 $
     * $Date: 2010-02-09 05:55:56 -0600 (Tue, 09 Feb 2010) $
     *
     * Copyright 2003-2007 Jive Software.
     *
     * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    package org.jivesoftware.smack;
    
    import org.jivesoftware.smack.packet.Packet;
    
    import java.io.IOException;
    import java.io.Writer;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    /**
     * Writes packets to a XMPP server. Packets are sent using a dedicated thread.
     * Packet interceptors can be registered to dynamically modify packets before
     * they're actually sent. Packet listeners can be registered to listen for all
     * outgoing packets.
     * 
     * @see Connection#addPacketInterceptor
     * @see Connection#addPacketSendingListener
     * 
     * @author Matt Tucker
     */
    class PacketWriter {
    
    	private Thread writerThread;
    	private Thread keepAliveThread;
    	private Writer writer;
    	private XMPPConnection connection;
    	private final BlockingQueue<Packet> queue;
    	private boolean done;
    
    	/**
    	 * Timestamp when the last stanza was sent to the server. This information
    	 * is used by the keep alive process to only send heartbeats when the
    	 * connection has been idle.
    	 */
    	private long lastActive = System.currentTimeMillis();
    
    	/**
    	 * Creates a new packet writer with the specified connection.
    	 * 
    	 * @param connection
    	 *            the connection.
    	 */
    	protected PacketWriter(XMPPConnection connection) {
    		this.queue = new ArrayBlockingQueue<Packet>(500, true);
    		this.connection = connection;
    		init();
    	}
    
    	/**
    	 * Initializes the writer in order to be used. It is called at the first
    	 * connection and also is invoked if the connection is disconnected by an
    	 * error.
    	 */
    	protected void init() {
    		this.writer = connection.writer;
    		done = false;
    
    		writerThread = new Thread() {
    			public void run() {
    				writePackets(this);
    			}
    		};
    		writerThread.setName("Smack Packet Writer ("
    				+ connection.connectionCounterValue + ")");
    		writerThread.setDaemon(true);
    	}
    
    	/**
    	 * Sends the specified packet to the server.
    	 * 
    	 * @param packet
    	 *            the packet to send.
    	 */
    	public void sendPacket(Packet packet) {
    		if (!done) {
    			// Invoke interceptors for the new packet that is about to be sent.
    			// Interceptors
    			// may modify the content of the packet.
    			connection.firePacketInterceptors(packet);
    
    			try {
    				queue.put(packet);
    			} catch (InterruptedException ie) {
    				ie.printStackTrace();
    				return;
    			}
    			synchronized (queue) {
    				queue.notifyAll();
    			}
    
    			// Process packet writer listeners. Note that we're using the
    			// sending
    			// thread so it's expected that listeners are fast.
    			connection.firePacketSendingListeners(packet);
    		}
    	}
    
    	/**
    	 * Starts the packet writer thread and opens a connection to the server. The
    	 * packet writer will continue writing packets until {@link #shutdown} or an
    	 * error occurs.
    	 */
    	public void startup() {
    		writerThread.start();
    	}
    
    	/**
    	 * Starts the keep alive process. A white space (aka heartbeat) is going to
    	 * be sent to the server every 30 seconds (by default) since the last stanza
    	 * was sent to the server.
    	 */
    	void startKeepAliveProcess() {
    		// Schedule a keep-alive task to run if the feature is enabled. will
    		// write
    		// out a space character each time it runs to keep the TCP/IP connection
    		// open.
    		int keepAliveInterval = SmackConfiguration.getKeepAliveInterval();
    		if (keepAliveInterval > 0) {
    			KeepAliveTask task = new KeepAliveTask(keepAliveInterval);
    			keepAliveThread = new Thread(task);
    			task.setThread(keepAliveThread);
    			keepAliveThread.setDaemon(true);
    			keepAliveThread.setName("Smack Keep Alive ("
    					+ connection.connectionCounterValue + ")");
    			keepAliveThread.start();
    		}
    	}
    
    	void setWriter(Writer writer) {
    		this.writer = writer;
    	}
    
    	/**
    	 * Shuts down the packet writer. Once this method has been called, no
    	 * further packets will be written to the server.
    	 */
    	public void shutdown() {
    		done = true;
    		synchronized (queue) {
    			queue.notifyAll();
    		}
    	}
    
    	/**
    	 * Cleans up all resources used by the packet writer.
    	 */
    	void cleanup() {
    		connection.interceptors.clear();
    		connection.sendListeners.clear();
    	}
    
    	/**
    	 * Returns the next available packet from the queue for writing.
    	 * 
    	 * @return the next packet for writing.
    	 */
    	private Packet nextPacket() {
    		Packet packet = null;
    		// Wait until there's a packet or we're done.
    		while (!done && (packet = queue.poll()) == null) {
    			try {
    				synchronized (queue) {
    					queue.wait();
    				}
    			} catch (InterruptedException ie) {
    				// Do nothing
    			}
    		}
    		return packet;
    	}
    
    	private void writePackets(Thread thisThread) {
    		try {
    			// Open the stream.
    			openStream();
    			// Write out packets from the queue.
    			while (!done && (writerThread == thisThread)) {
    				Packet packet = nextPacket();
    				if (packet != null) {
    					synchronized (writer) {
    						writer.write(packet.toXML());
    						writer.flush();
    						// Keep track of the last time a stanza was sent to the
    						// server
    						lastActive = System.currentTimeMillis();
    					}
    				}
    			}
    			// Flush out the rest of the queue. If the queue is extremely large,
    			// it's possible
    			// we won't have time to entirely flush it before the socket is
    			// forced closed
    			// by the shutdown process.
    			try {
    				synchronized (writer) {
    					while (!queue.isEmpty()) {
    						Packet packet = queue.remove();
    						writer.write(packet.toXML());
    					}
    					writer.flush();
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    
    			// Delete the queue contents (hopefully nothing is left).
    			queue.clear();
    
    			// Close the stream.
    			try {
    				writer.write("</stream:stream>");
    				writer.flush();
    			} catch (Exception e) {
    				// Do nothing
    			} finally {
    				try {
    					writer.close();
    				} catch (Exception e) {
    					// Do nothing
    				}
    			}
    		} catch (IOException ioe) {
    			if (!done) {
    				done = true;
    				connection.packetReader.notifyConnectionError(ioe);
    			}
    		}
    	}
    
    	/**
    	 * Sends to the server a new stream element. This operation may be requested
    	 * several times so we need to encapsulate the logic in one place. This
    	 * message will be sent while doing TLS, SASL and resource binding.
    	 * 
    	 * @throws IOException
    	 *             If an error occurs while sending the stanza to the server.
    	 */
    	void openStream() throws IOException {
    		StringBuilder stream = new StringBuilder();
    		stream.append("<stream:stream");
    		stream.append(" to=\"").append(connection.getServiceName())
    				.append("\"");
    		stream.append(" xmlns=\"jabber:client\"");
    		stream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
    		stream.append(" version=\"1.0\">");
    		writer.write(stream.toString());
    		writer.flush();
    	}
    
    	/**
    	 * A TimerTask that keeps connections to the server alive by sending a space
    	 * character on an interval.
    	 */
    	private class KeepAliveTask implements Runnable {
    
    		private int delay;
    		private Thread thread;
    
    		public KeepAliveTask(int delay) {
    			this.delay = delay;
    		}
    
    		protected void setThread(Thread thread) {
    			this.thread = thread;
    		}
    
    		public void run() {
    			try {
    				// Sleep 15 seconds bhefore sending first heartbeat. This will
    				// give time to
    				// properly finish TLS negotiation and then start sending
    				// heartbeats.
    				Thread.sleep(15000);
    			} catch (InterruptedException ie) {
    				// Do nothing
    			}
    			while (!done && keepAliveThread == thread) {
    				synchronized (writer) {
    					// Send heartbeat if no packet has been sent to the server
    					// for a given time
    					if (System.currentTimeMillis() - lastActive >= delay) {
    						try {
    							writer.write(" ");
    							writer.flush();
    						} catch (Exception e) {
    							// Do nothing
    						}
    					}
    				}
    				try {
    					// Sleep until we should write the next keep-alive.
    					Thread.sleep(delay);
    				} catch (InterruptedException ie) {
    					// Do nothing
    				}
    			}
    		}
    	}
    }
    Please, if anyone can help...

  2. #2
    Nokia Developer Moderator
    Join Date
    Feb 2006
    Location
    Oslo, Norway
    Posts
    28,750

    Re: ArrayBlockingQueue

    You will not have to make a decision actually. Check http://docs.oracle.com/javame/config...e-summary.html, and you will see that Vector is the only container class you have, so you may want to use that one. For making it thread-safe, wrap into synchronized blocks on access or inherit your own FIFO class and make its methods synchronized.

  3. #3
    Nokia Developer Champion
    Join Date
    Mar 2013
    Posts
    686

    Re: ArrayBlockingQueue

    Small correction,AFAIK Vector is already thread safe (all pre - generic containers are thread safe) (and there is also Hashtable as a container) so there is no need to synchronize its calls.
    However you will need to make it BLOCKING QUEUE which mean you will need to wait() in the dequeue method and notifyAll() in the enqueue method so those will be synchronized no matter what.
    so just have a class that contain a vector (i prefer the option of class A contains member class B than class A inherits from class B because that way you can decide what will be the visible methods and not allow the rest of the Vector visible method to be called). and in the enqueue simply call addElement() and in the dequeue simply call firstElement() and removeElementAt(0) to get the first in the queue and remove it.

  4. #4
    Regular Contributor
    Join Date
    Dec 2012
    Location
    Indonesia,DKI Jakarta
    Posts
    87

    Re: ArrayBlockingQueue

    Hai Shai,

    So, i don't need to make the linked list implementation..? because afaik linkedlist instead of vector. Does vector enough for thread safe..?.


    Quote Originally Posted by shai.i View Post
    Small correction,AFAIK Vector is already thread safe (all pre - generic containers are thread safe) (and there is also Hashtable as a container) so there is no need to synchronize its calls.
    However you will need to make it BLOCKING QUEUE which mean you will need to wait() in the dequeue method and notifyAll() in the enqueue method so those will be synchronized no matter what.
    so just have a class that contain a vector (i prefer the option of class A contains member class B than class A inherits from class B because that way you can decide what will be the visible methods and not allow the rest of the Vector visible method to be called). and in the enqueue simply call addElement() and in the dequeue simply call firstElement() and removeElementAt(0) to get the first in the queue and remove it.

  5. #5
    Regular Contributor
    Join Date
    Dec 2012
    Location
    Indonesia,DKI Jakarta
    Posts
    87

    Re: ArrayBlockingQueue

    Hai wizard_hu_

    Yes, i already make the linked list implementation on my project and linkedlist instead of vector.

    Quote Originally Posted by wizard_hu_ View Post
    You will not have to make a decision actually. Check http://docs.oracle.com/javame/config...e-summary.html, and you will see that Vector is the only container class you have, so you may want to use that one. For making it thread-safe, wrap into synchronized blocks on access or inherit your own FIFO class and make its methods synchronized.

  6. #6
    Nokia Developer Champion
    Join Date
    Mar 2013
    Posts
    686

    Re: ArrayBlockingQueue

    You dont have to use LinkedList and you can use Vector in the way i wrote below.

  7. #7
    Regular Contributor
    Join Date
    Dec 2012
    Location
    Indonesia,DKI Jakarta
    Posts
    87

    Re: ArrayBlockingQueue

    Hi Shai,

    In my code i have code like this: this.queue = new ArrayBlockingQueue<Packet>(500, true);. How to conver ArrayBlockingQueue into j2me..?.



    Best Regards,
    Iqbal

  8. #8
    Nokia Developer Champion
    Join Date
    Mar 2013
    Posts
    686

    Re: ArrayBlockingQueue

    see my answer on #3

  9. #9
    Regular Contributor
    Join Date
    Dec 2012
    Location
    Indonesia,DKI Jakarta
    Posts
    87

    Re: ArrayBlockingQueue

    Ok shai, i successfully create that. I am gonna post my code here. Maybe can help another developer.

Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
×