Mina网络通信框架示例

Mina网络通信框架示例

一、功能

用 Mina 框架,同时实现服务器和客户端,实现客户端与客户端之间的 IM 通信,使用 Java JDBC 接口连入 MySQL 数据库。

二、数据库登录处理逻辑

package database;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.mina.core.session.IoSession;

import util.MessageUtil;
import util.SessionUtil;
import common.Common;

/**
 * 管理客户端的数据库
 */
public class ClientJDBC {

    public ClientJDBC() {
        try {
            // 加载JDBC Driver
            Class.forName("com.mysql.jdbc.Driver");

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查询是否已存在此用户
     * 2、注册,添加新数据
     */
    public void addNewData(String name, String password, IoSession loginSession) {
        try {
            // 建立连接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 创建Statement
            Statement statement = connection.createStatement();
            // 查询是否已存在此用户
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果不存在此用户
            if (!set.next()) {
                // 添加数据
                PreparedStatement preparedStatement = connection
                        .prepareStatement("insert into client values(null,?,?)");
                preparedStatement.setString(1, name);
                preparedStatement.setString(2, password);
                preparedStatement.executeUpdate();
                // 提示用户注册成功
                loginSession.write(Common.CANREG);
            } else {
                // 提示用户已注册
                loginSession.write(Common.CANTRE);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * <pre>
     * 1、查询帐号密码是否匹配
     * 2、查询是否用户已登录
     * 3、登录
     */
    public boolean login(String name, String password, IoSession userSession) {
        try {
            // 建立连接
            Connection connection = DriverManager.getConnection(
                    "jdbc:mysql://localhost:3306/clientdb", "root", "");

            // 创建Statement
            Statement statement = connection.createStatement();

            // 查询是否已存在此用户
            String selectSQL = "select password from client where name=" + "'"
                    + name + "'";
            ResultSet set = statement.executeQuery(selectSQL);
            // 如果存在此用户
            if (set.next()) {
                // 密码匹配成功
                if (set.getString("password").equals(password)) {
                    // 如果用户没登录
                    if (SessionUtil.getSessionFromName(name) == null) {
                        // 存储用户名
                        userSession.setAttribute("name", name);
                        // 登录成功
                        String s = name;
                        String message = MessageUtil.createSendMessage(Common.LOGIN_SUCCESS, s);
                        userSession.write(message);
                        // 发送在线列表给所有客户端
                        SessionUtil.sendClientListToAll();
                    } else {
                        // 提示用户已登录
                        userSession.write(Common.IS_LOGIN);
                    }
                } else {
                    // 提示帐号和密码不匹配
                    userSession.write(Common.ISNT_MATCH);
                }
            } else {
                // 提示帐号和密码不匹配
                userSession.write(Common.ISNT_MATCH);
            }
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return true;
    }
}

服务器端Handler处理逻辑。
注意,此处继承的是StreamIoHandler,为了后续对传输文件进行操作。

package server;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;
import util.SessionUtil;
import common.Common;

/**
 * Mina Handler
 */
public class ServerHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getRemoteAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以换行符结束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getRemoteAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注册信息
        if (result.startsWith(Common.REGISTER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.addNewData(array[0], array[1], session);
        }
        // 登录信息
        else if (result.startsWith(Common.LOGIN) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            MinaServer.clientJDBC.login(array[0], array[1], session);
        }
        // 登出信息
        else if (result.equals(Common.LOGOUT) && result.length() == 6) {
            // 关闭当前session
            session.close();
            // 发送在线列表给所有客户端
            SessionUtil.sendClientListToAll();
        }
        // 用户之间发送消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // 分隔选中用户名和要发送的消息
            String[] array = suffix.split(Common.SEPARATOR);
            // 向指定用户发送消息
            if (session.getAttribute("name") != null)
                SessionUtil.sendToSelectedClient(
                        (String) session.getAttribute("name"), array[0],
                        array[1]);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getRemoteAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionClosed");
    }

    /**
     * 用户创建的时候保留会话的ip和端口
     */
    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out.println(session.getRemoteAddress() + "   "
                + "sessionCreated");
        // 提取客户端ip和端口
        String s = session.getRemoteAddress() + "";
        String port = s.substring(s.indexOf(":") + 1);
        String ip = s.substring((s.indexOf("/") + 1),
                s.length() - 1 - port.length());
        // 存储ip和端口
        if (session.getAttribute("ip") == null)
            session.setAttribute("ip", ip);
        if (session.getAttribute("port") == null)
            session.setAttribute("port", port);
    }

    /**
     * 会话空闲状态
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getRemoteAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out
                .println(session.getRemoteAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getRemoteAddress() + "   "
                + "processStreamIo");
        // 设定一个线程池
        // 参数说明:最少数量3,最大数量6 空闲时间 3秒
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 6, 3,
                TimeUnit.SECONDS,
                // 缓冲队列为3
                new ArrayBlockingQueue<Runnable>(3),
                // 抛弃旧的任务
                new ThreadPoolExecutor.DiscardOldestPolicy());
        FileOutputStream fos = null;
        // 此处路径如何动态设定。
        File receiveFile = new File("F:\\111.jks");

        try {
            fos = new FileOutputStream(receiveFile);
        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        } finally {
            try {
                fos.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        // 将线程放入线程池 当连接很多时候可以通过线程池处理
        threadPool.execute(new IoStreamThreadWork(input, fos));
    }
}

客户端Handler处理逻辑:

package client;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.swing.DefaultListModel;
import javax.swing.JOptionPane;
import javax.swing.JTextArea;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.handler.stream.StreamIoHandler;

import thread.IoStreamThreadWork;

import common.Common;

import frame.ChatFrame;
import frame.LoginFrame;

/**
 * Mina Handler
 */
// IoHandlerAdapter
public class ClientHandler extends StreamIoHandler {

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        System.out.println(session.getLocalAddress() + "   "
                + "exceptionCaught");
        System.out.println(cause);
    }

    /**
     * mina只能接收以换行符结束的信息
     */
    @Override
    public void messageReceived(IoSession session, Object message) {
        System.out.println(session.getLocalAddress() + "   "
                + "messageReceived");

        // 解析信息
        String result = (String) message;

        // 注册成功
        if (result.equals(Common.CANREG) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "注册成功!");
        }
        // 用户已存在注册失败
        else if (result.equals(Common.CANTRE) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用户已存在!");
        }
        // 用户已登录,登录失败
        else if (result.equals(Common.IS_LOGIN) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "用户已登录!");
            session.close();// 关闭这个Session
        }
        // 帐号和密码不匹配,登录失败
        else if (result.equals(Common.ISNT_MATCH) && result.length() == 6) {
            JOptionPane.showMessageDialog(null, "帐号和密码不匹配!");
            session.close();// 关闭这个Session
        }
        // 登录成功
        else if (result.startsWith(Common.LOGIN_SUCCESS)
                && result.length() > 10) {
            String suffix = result.substring(10);
            // 从Attribute中取出登录窗口
            LoginFrame frame = (LoginFrame) session.getAttribute("loginFrame");
            // 关闭登录窗口
            frame.dispose();
            // 打开聊天窗口
            new ChatFrame(session).setTitle("用户" + suffix + "的聊天窗口");
        }
        // 服务器发送在线列表消息
        else if (result.startsWith(Common.SERVER) && result.length() > 10) {
            String suffix = result.substring(10);
            String[] array = suffix.split(",");
            // 判断空,因为有loginSession
            if (session.getAttribute("clientListModel") != null) {
                // 取出clientListModel
                DefaultListModel<String> clientListModel = (DefaultListModel<String>) (session
                        .getAttribute("clientListModel"));
                // 清空原视图
                clientListModel.removeAllElements();
                // 重新加入在线用户列表
                for (String clientName : array)
                    clientListModel.addElement(clientName);
            }
        }
        // 客户发送过来的消息
        else if (result.startsWith(Common.CLIENT) && result.length() > 10) {
            String suffix = result.substring(10);
            // if (session.getAttribute("receivedMessage") != null)
            JTextArea jta = (JTextArea) session.getAttribute("receivedMessage");
            jta.append(suffix + "\n");
            // 滚动到底端
            jta.setCaretPosition(jta.getText().length());
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "messageSent");
    }

    @Override
    public void sessionClosed(IoSession session) throws Exception {
        System.out.println(session.getLocalAddress() + "   " + "sessionClosed");
    }

    @Override
    public void sessionCreated(IoSession session) throws Exception {
        System.out
                .println(session.getLocalAddress() + "   " + "sessionCreated");
    }

    /**
     * 会话空闲状态
     */
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        System.out.println(session.getLocalAddress() + "   " + "sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession session) {
        System.out.println(session.getLocalAddress() + "   " + "sessionOpened");
    }

    @Override
    protected void processStreamIo(IoSession session, InputStream input,
            OutputStream output) {
        System.out.println(session.getLocalAddress() + "   "
                + "processStreamIo");
        // 客户端发送文件
        File sendFile = new File("e:\\MyKey.jks");
        FileInputStream fis = null;
        try {
            fis = new FileInputStream(sendFile);

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        // 放入线程让其执行
        // 客户端一般都用一个线程实现即可 不用线程池
        new IoStreamThreadWork(fis, output).start();
        return;
    }
}

在 Mina 的使用过程中,应注意以下问题:

  • 1、客户端IoSession.setAttribute后,服务器IoSession.getAttribute为null,这两个session虽然getId一样,但不是同一个session。
  • 2、使用TextLineCodecFactory,messageReceived方法默认只能接收以换行符结束的信息。
  • 3、使用getAttribute时应注意判断空指针:if (session.getAttribute("name") != null && session.getAttribute("name").equals(name))。否则不会报错,但下面的程序段会终止执行。可在exceptionCaught方法中catch到异常
  • 4、每个客户端对应一个ConnectFuture,每个ConnectFuture对应一个IoSession。
  • 5、可以巧用session.setAttribute("loginFrame", LoginFrame.this);来进行窗口的传递,注意:如果在监听中直接传this,要注意是不是LoginFrame.this。
  • 6、mina不能动态修改acceptor的ProtocolCodecFilter和Handler。
  • 7、传输文件可通过StreamIoHandler和MessageEncoder的方式。

开源项目:
MiniWeChat-Server
minaSpring

评论

暂无

添加新评论