`
pjwqq
  • 浏览: 79752 次
社区版块
存档分类
最新评论

最简Android推送客户端模型

阅读更多

     最简单的android推送client模型,特点:读写使用同一线程,逻辑非常简单;断线重连;心跳;发消息等。要求不高的基本可以凑合用了呵呵。

 

package com.example.nbctts;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import android.app.Activity;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.View;
import android.view.View.OnClickListener;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import com.example.nbctts.model.ITask;
import com.example.nbctts.model.RcvMsg;
import com.example.nbctts.model.SendMsg;
import com.example.nbctts.model.Status;
import com.google.gson.Gson;

/*
 * 1.Timer控制任务投放节奏
 * 2.用单线程池控制任务线程顺序执行
 * 3.用socket.setSoSocketTime(timeout)及SocketTimeoutException控制阻塞读时间
 */
public class MainActivity extends Activity {

	EditText msgEdt;
	Button sendBtn;
	TextView serverTxt;

	protected final ExecutorService singleThreadExecutor = Executors
			.newSingleThreadExecutor();
	protected final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();// 任务队列
	protected Socket socket = null;
	protected Socket rcvSocket = null;
	protected BufferedReader mReader;// BufferedWriter 用于推送消息
	protected BufferedWriter mWriter;// BufferedReader 用于接收消息
	protected Gson gson = new Gson();
	protected Timer timer = new Timer();

	/*
	 * 队列中是否已有该类型的任务
	 */
	private boolean isTaskExist(int msgType) {
		Iterator<Runnable> it = queue.iterator();
		while (it.hasNext()) {
			ITask task = (ITask) it.next();
			if (task.getID() == msgType) {
				return true;
			}
		}
		return false;
	}

	/*
	 * 添加到队列,除了发送消息任务以外,其它不重复添加到队列
	 */
	private void addQueue(Runnable task) {
		int id = ((ITask) task).getID();
		try {
			if (id == Status.MSG_H) {
				queue.put(task);
			} else {
				if (!isTaskExist(id)) {
					queue.put(task);
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/*
	 * 线程消息处理,提交任务到待处理队列
	 */
	public Handler myHandler = new Handler() {
		@Override
		public void handleMessage(Message msg) {
			HandlerMsg m = (HandlerMsg) msg.obj;
			switch (msg.what) {
			case Status.CONN_H:
				serverTxt.append(m.content + "\n");
				// 连接不成功,自动重连
				if (!m.successFlag) {
					timer.schedule(new TimerTask() {
						@Override
						public void run() {
							addQueue(new connTask());
						}
					}, Status.getConnIntervel());
				}
				break;
			case Status.LOGIN_H:
				serverTxt.append(m.content + "\n");
				break;
			case Status.SVRMSG_H:
				serverTxt.append(m.content + "\n");
				break;
			case Status.HEARTBEAT_H:
				// 暂不处理
				break;
			case Status.MSG_H:
				// 暂不处理
			case Status.LOGOUT_H:
				// 暂不处理
				break;
			default:
				break;
			}
		}
	};

	@Override
	protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		setContentView(R.layout.activity_main);
		msgEdt = (EditText) findViewById(R.id.msgEdt);
		sendBtn = (Button) findViewById(R.id.sendBtn);
		serverTxt = (TextView) findViewById(R.id.serverTxt);

		// 启动任务处理线程
		new TaskThread().start();
		addQueue(new connTask());
		// 定时读推送消息/心跳,这里可以分开:)
		timer.schedule(new TimerTask() {
			@Override
			public void run() {
				addQueue(new RcvTask());
				addQueue(new HeartBeatTask());
			}
		}, 1000, Status.heartBeat);

		sendBtn.setOnClickListener(new OnClickListener() {
			@Override
			public void onClick(View v) {
				String str = msgEdt.getText().toString();
				serverTxt.append("client:" + str + "\n");
				addQueue(new sendTask(str));
			}
		});
	}

	/*
	 * 处理的消息
	 */
	class HandlerMsg {
		public HandlerMsg(boolean successFlag, String content) {
			this.successFlag = successFlag;
			this.content = content;
		}

		private boolean successFlag;
		private String content;
	}

	/*
	 * 任务处理线程
	 */
	class TaskThread extends Thread {
		@Override
		public void run() {
			while (true) {
				if (!queue.isEmpty()) {
					try {
						singleThreadExecutor.execute(queue.take());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}
	}

	/*
	 * 1.连接服务器任务
	 */
	class connTask implements Runnable, ITask {
		@Override
		public void run() {
			if (!Status.isConnected) {
				socket = new Socket();
				try {
					socket.setKeepAlive(true);
					socket.setSoTimeout(Status.readBlock);
					socket.connect(
							new InetSocketAddress("169.169.43.56", 9999),
							Status.connTimeout);
					mReader = new BufferedReader(new InputStreamReader(
							socket.getInputStream(), "utf-8"));
					mWriter = new BufferedWriter(new OutputStreamWriter(
							socket.getOutputStream(), "utf-8"));
					Status.isConnected = true;
					Status.reConNum = 0;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(true, "connTask:已连接到服务器");
					myHandler.sendMessage(msg);
					// 连接成功,则尝试登录
					queue.add(new loginTask());
				} catch (IOException e) {
					Status.isConnected = false;
					Status.isLogin = false;
					Status.reConNum++;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "connTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.CONN_H;
		}
	}

	/*
	 * 2.登录到服务器任务
	 */
	class loginTask implements Runnable, ITask {
		@Override
		public void run() {
			try {
				if (Status.isConnected) {
					SendMsg sm = new SendMsg();
					sm.setOpt("LOGIN");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
					// 服务器回复
					String rs = mReader.readLine().trim();
					RcvMsg rm = gson.fromJson(rs, RcvMsg.class);
					if (rm.getContent().equals("ok")) {
						Status.isLogin = true;
						Message msg = new Message();
						msg.what = Status.LOGIN_H;
						msg.obj = new HandlerMsg(true, "loginTask:已登录到服务器");
						myHandler.sendMessage(msg);
					} else {
						Status.isLogin = false;
						Message msg = new Message();
						msg.what = Status.LOGIN_H;
						msg.obj = new HandlerMsg(false, "loginTask:登录到服务器失败:"
								+ rm.getContent());
						myHandler.sendMessage(msg);
					}
				}
			} catch (SocketTimeoutException e) {
				e.printStackTrace();
			} catch (IOException e) {
				Status.isConnected = false;
				Status.isLogin = false;
				Message msg = new Message();
				msg.what = Status.CONN_H;
				msg.obj = new HandlerMsg(false, "loginTask:无法连接到服务器");
				myHandler.sendMessage(msg);
			}
		}

		@Override
		public int getID() {
			return Status.LOGIN_H;
		}
	}

	/*
	 * 3.接收服务器消息任务
	 */
	class RcvTask implements Runnable, ITask {
		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					String line = "";
					while ((line = mReader.readLine()) != null) {					
						RcvMsg rm = gson.fromJson(line, RcvMsg.class);
						// 目前只接收推来的消息
						if (rm != null && rm.getOpt().equals("SVRMSG")) {
							Message msg = new Message();
							msg.what = Status.SVRMSG_H;
							msg.obj = new HandlerMsg(true, rm.getContent());
							myHandler.sendMessage(msg);
						}
					}
				} catch (SocketTimeoutException e) {
					e.printStackTrace();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "RcvTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.SVRMSG_H;
		}
	}

	/*
	 * 4.心跳
	 */
	class HeartBeatTask implements Runnable, ITask {
		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					SendMsg sm = new SendMsg();
					sm.setOpt("HEARTBEAT");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "HeartBeatTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			}
		}

		@Override
		public int getID() {
			return Status.HEARTBEAT_H;
		}
	}

	/*
	 * 5.发送消息
	 */
	class sendTask implements Runnable, ITask {
		final String str;

		public sendTask(String str) {
			this.str = str;
		}

		@Override
		public void run() {
			if (Status.isConnected && Status.isLogin) {
				try {
					SendMsg sm = new SendMsg();
					sm.setOpt("MSG");
					sm.setUser(Status.username);
					sm.setPassword(Status.password);
					sm.setTargetUser("");// 暂时这样
					sm.setContent(str);
					String s = gson.toJson(sm, SendMsg.class);
					mWriter.write(s + "\n");
					mWriter.flush();
				} catch (IOException e) {
					Status.isConnected = false;
					Message msg = new Message();
					msg.what = Status.CONN_H;
					msg.obj = new HandlerMsg(false, "sendTask:无法连接到服务器");
					myHandler.sendMessage(msg);
				}
			} else {
				// 发送消息时如果未连接,马上重连
				Status.reConNum = 0;
				addQueue(new connTask());
			}
		}

		@Override
		public int getID() {
			return Status.MSG_H;
		}
	}

}

 

 

界面不贴,很简单一个输入框输消息内容,一个按钮发消息,一个文本框记录log及服务端推过来的消息,只是一个测试demo。

 

经测试:断线重连,连接后自动登录,心跳,发消息都没问题,就是接收服务端推送时有延时,这个没办法,毕竟是间隔读,其实也可以把间隔搞小一点。

一般推送对实时性要求不是很高吧,所以基本上可以用。

其实用BlockingQueue,搞成和asmack那样也可以,我这个也算种思路么,哈哈,记录之。

 

0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics