服务器在用户之间交替而不是广播 - java

我一直在一个消息系统上工作,其中用户键入服务器IP /端口,然后该服务器接收消息并将其中继给服务器上的所有其他用户。整个程序是基于我从头开始重写的echo服务器的,对于每个server.accept()套接字,它都会创建两个线程,一个用于接收消息,另一个用于将消息发回。这两个线程由DatagramPacket系统连接,因此,如果服务器从一个套接字接收到一条消息,则会将其发送回所有其他用户,因为他们的线程正在侦听同一件事,这就是我遇到问题的地方;一切正常,除了接收消息的用户为了登录时间而轮流使用。

连接两个客户端时的问题示例:

客户端1发送10条消息:

0
1
2
3
4
5
6
7
8
9

服务器接收所有这些。

客户#1收到:

1
3
5
7
9

2号客户收到:

0
2
4
6
8

这是客户端的代码:

import java.io.*;
import java.util.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class MessageClient {
    public static void main(String[] args) {
        System.out.println("Starting Message System...");
        Scanner in = new Scanner(System.in);
        MessageClient mc = new MessageClient();
        String input;
        System.out.println(":System Started, type help for help.");
        System.out.print(":");
        while (true) {
            input = in.nextLine();
            if (input.equalsIgnoreCase("HELP")) {
                mc.printHelp();
                System.out.print(":");
            } else if (input.equalsIgnoreCase("QUIT")) {
                System.exit(0);
            } else if (input.equalsIgnoreCase("CONNECT")) {
                mc.connect(in);
                in.nextLine();
                System.out.print(":");
            } else {
                System.out.print("No command found.\n:");
            }
        }
    }
    public static void printHelp() {
        System.out.println("help\tShow this prompt\nconnect\tStarts a new connection\nquit\tQuit the program\nexit\tExit a connection");
    }
    public void connect(Scanner in) {
        Socket soc = null;
        InetAddress addr = null;
        System.out.print("IP_ADDRESS/HOST:");
        String ip = in.nextLine();
        System.out.print("PORT:");
        int port = in.nextInt();
        try {
            System.out.println("Attempting to connect to HOST:\'" + ip + "\' on PORT:\'" + port + "\'");
            addr = InetAddress.getByName(ip);
            soc = new Socket(addr, port);
        } catch(Exception e) {
            System.out.println("Error connecting to server: " + e.getLocalizedMessage());
            return;
        }
        SwingUtilities.invokeLater(new MessageGUI(ip + ":" + port, soc));
    }
}

class MessageGUI implements Runnable {
    public MessageGUI(String windowName, Socket server) {
        JFrame window = new JFrame(windowName);
        window.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        window.setSize(500, 300);
        window.setLayout(new BorderLayout());
        window.setVisible(true);

        MessageReceive mr = new MessageReceive(server);
        mr.setEditable(false);
        mr.setBackground(new Color(0, 0, 0));
        mr.setForeground(new Color(0, 255, 0));
        mr.setVisible(true);
        new Thread(mr).start();
        window.add(mr, BorderLayout.CENTER);

        DataOutputStream dos = null;
        try {
            dos = new DataOutputStream(server.getOutputStream());
        } catch(Exception e) {
            System.out.println("Error creating output stream to server: " + e.getLocalizedMessage());
        }

        JTextField input = new JTextField();
        input.addActionListener(new MessageSend(server, input, dos));
        input.setBackground(new Color(0, 0, 0));
        input.setForeground(new Color(0, 255, 0));
        window.add(input, BorderLayout.PAGE_END);

        System.out.println("Displaying connection.");
    }
    public void run() {}
}

class MessageReceive extends JTextArea implements Runnable {
    protected Socket server;
    public MessageReceive(Socket server) {
        this.server = server;
    }
    public void run() {
        DataInputStream dis = null;
        int bytes;
        try {
            dis = new DataInputStream(server.getInputStream());
        } catch(Exception e) {
            System.out.println("Error connecting server: " + e.getLocalizedMessage());
        }
        this.append("Connected.\n");
        while (true) {
            try {
                while ((bytes = dis.read()) != -1) this.append(String.valueOf((char) bytes));
            } catch(Exception e) {
                System.out.println("Error reading from server: " + e.getLocalizedMessage());
                return;
            }
        }
    }
}

class MessageSend implements ActionListener {
    protected Socket server;
    protected JTextField input;
    protected DataOutputStream dos = null;
    public MessageSend(Socket server, JTextField input, DataOutputStream dos) {
        this.server = server;
        this.input = input;
        this.dos = dos;
    }
    public void actionPerformed(ActionEvent ae) {
        try {
            dos.writeBytes(input.getText() + "\n");
            input.setText("");
        } catch(Exception e) {
            System.out.println("Error writing to server output stream: " + e.getLocalizedMessage());
        }
    }
}

这是服务器的代码:

import java.io.*;
import java.net.*;
import java.util.*;

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer ms = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        DatagramSocket ds = null;
        try {
            ds = new DatagramSocket(4);
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ds).start();
            new MessageConnectionOut(client, ds).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionOut(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ds.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionIn(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ds.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

更新:

我尝试用MulticastSockets替换所有DatagramSockets并将其声明为MessageServer.main()时添加到组中。发生了同样的问题。

组播代码:

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer msgsrv = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        MulticastSocket ms = null;
        try {
            ms = new MulticastSocket(4);
            ms.joinGroup(InetAddress.getByName("225.65.65.65"));
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ms).start();
            new MessageConnectionOut(client, ms).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionOut(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ms.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
                System.out.println("SENT_TO:" + this.getName());
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionIn(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ms.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

参考方案

该示例可能会帮助您。

服务器有2个线程。

一种用于读取UDP消息。我使用了2个不同的端口,因为我只想避免通过同一进程读取消息。我没有2台机器对其进行测试。在我的本地主机上测试。
另一个线程将广播读取器线程接收到的UDP消息。

有一个线程安全列表,在线程之间充当数据同步。接收到的数据已添加到列表中。广播者线程在列表中轮询数据,如果有广播,则睡眠500微秒。使用执行程序创建线程。

private final static String INET_ADDR = "224.0.0.3";
private final static int PORT1 = 8888;
private final static int PORT2 = 8889;
private static List<String> threadSafeList = null;

public static void main(String[] args) throws UnknownHostException, InterruptedException {
    threadSafeList = new CopyOnWriteArrayList<String>();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new Sender(InetAddress.getByName(INET_ADDR), PORT1));
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR), PORT2));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            //Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf, buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                threadSafeList.add(msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address " + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        while (true) {
            try (DatagramSocket serverSocket = new DatagramSocket()) {
                for (Iterator<String> it = threadSafeList.iterator(); !threadSafeList.isEmpty() && it.hasNext(); ) {

                    String i = it.next();
                    String msg = "Sent message no " + i;

                    // Create a packet that will contain the data
                    // (in the form of bytes) and send it.
                    DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(), msg.getBytes().length, this.addr, this.port);
                    serverSocket.send(msgPacket);

                    threadSafeList.remove(i);
                    System.out.println("Server sent packet with msg: " + msg);
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            try {
                System.out.println("going for sleep"); 
                Thread.currentThread().sleep(500);
                System.out.println("going for sleeping"); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

可以通过更改发送方线程的创建来修改设计。每当接收方线程收到消息时,就创建一个发送方线程并进行广播并关闭该线程。您可以使用可重用的线程池,而不是本示例中使用的固定线程池。而且,您可以在创建发送方线程时将消息作为参数传递(因此可能根本不需要列表)并进行提交。我有代码。

    public static void main(String[] args) throws UnknownHostException,
        InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR),
            PORT2, executorService));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

还有内部阶级

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;
    private ExecutorService executorService;

    public Receiver(InetAddress inetAddress, int port,
            ExecutorService executorService) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.executorService = executorService;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                executorService.submit(new Sender(InetAddress
                        .getByName(INET_ADDR), PORT1, msg));
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;
    private String message;

    public Sender(InetAddress inetAddress, int port, String message)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.message = message;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        try {
            DatagramSocket serverSocket = new DatagramSocket();
            String msg = "Sent message no " + message;

            // Create a packet that will contain the data
            // (in the form of bytes) and send it.
            DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(),
                    msg.getBytes().length, this.addr, this.port);
            serverSocket.send(msgPacket);

            System.out.println("Server sent packet with msg: " + msg);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

客户端有2个线程,

一种用于读取广播者的消息。
另一个用于循环发送5条消息。完成后,线程将关闭。

这里没有数据交换,因此没有线程安全列表。

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver port " + this.port);
        byte[] buf = new byte[256];

        try (MulticastSocket clientSocket = new MulticastSocket(this.port)) {
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);
            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        try {
            DatagramSocket serverSocket = new DatagramSocket();

            for (int i = 0; i < 5; i++) {

                System.out.println("inside loop");
                String msg = "Sent message no 2" + i;

                // Create a packet that will contain the data
                // (in the form of bytes) and send it.
                DatagramPacket msgPacket = new DatagramPacket(
                        msg.getBytes(), msg.getBytes().length, this.addr,
                        this.port);
                System.out.println("Before sending to socket");
                serverSocket.send(msgPacket);

                System.out.println("Server sent packet with msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

This article sample code is extended further.

代码需要微调。

System.out.printf不打印整数参数 - java

我是Java编程的新手,无法从另一个类返回方法。这两个类都可以编译并成功运行。我可以从一个类中调用一个简单的int,但是当我想计算用户输入的两个输入整数时,我只会得到一个空格。这是我的计算课class calculations { public final int AGE = 53; public int numbers(int num1, int num2…

在小数点后三位显示秒。 -Java - java

我在Java程序中使用毫秒,并将其转换为秒。我的方法执行完此操作后,它将以长格式返回秒。System.out.println("[" + threadID + "]" + " " + "SeqSum res=" + grandTotal + " secs=" …

Java SE 7:执行顺序 - java

我正在为Java SE 7考试而学习,并且正在研究示例问题。我似乎无法弄清楚为什么以下程序以x y c g的顺序返回。我理解为什么首先运行x,因为它是一个静态初始化块,但是有人可以解释为什么y在c和g之前运行吗?public class Triangle { Triangle() { System.out.print("c "); } {…

Java Double与BigDecimal - java

我正在查看一些使用双精度变量来存储(360-359.9998779296875)结果为0.0001220703125的代码。 double变量将其存储为-1.220703125E-4。当我使用BigDecimal时,其存储为0.0001220703125。为什么将它双重存储为-1.220703125E-4? 参考方案 我不会在这里提及精度问题,而只会提及数字…

当回复有时是一个对象有时是一个数组时,如何在使用改造时解析JSON回复? - java

我正在使用Retrofit来获取JSON答复。这是我实施的一部分-@GET("/api/report/list") Observable<Bills> listBill(@Query("employee_id") String employeeID); 而条例草案类是-public static class…