构建实时消息系统
Redis的Pub/Sub系统可以构建实时的消息系统,比如很多开发人员用Pub/Sub构建实时聊天系统。
import redis.clients.jedis.*;
import java.util.Date;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.RandomStringUtils;
class PrintListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
        System.out.println("message receive:" + message + ",channel:" + channel +
        "..." + time);
        //此处我们可以取消订阅
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
}
class PubClient {
    private Jedis jedis;
    public PubClient(String host,int port){
        jedis = new Jedis(host,port);
    }
    public void pub(String channel,String message){
        jedis.publish(channel, message);
    }
    public void close(String channel){
        jedis.publish(channel, "quit");
        jedis.del(channel);//实时消息系统
    }
}
class SubClient {
    private Jedis jedis;//
    public SubClient(String host,int port){
        jedis = new Jedis(host,port);
    }
    public void sub(JedisPubSub listener,String channel){
        jedis.subscribe(listener, channel);
        //此处将会阻塞,在client代码级别为JedisPubSub在处理消息时,将会“独占”链接
        //并且采取了while循环的⽅方式,侦听订阅的消息
    }
}
public class PubSubTest {
    /**
    * @param args
    */
    static String host = "127.0.0.1";
    static int port = 10011;
    public static void main(String[] args) throws Exception{
    PubClient pubClient = new PubClient(host,port);
    final String channel = "pubsub-channel";
    pubClient.pub(channel, "before1");
    pubClient.pub(channel, "before2");
    Thread.sleep(2000);
    //消息订阅者⾮非常特殊,需要独占链接,因此我们需要为它创建新的链接;
    //此外,jedis客户端的实现也保证了“链接独占”的特性,sub⽅方法将⼀一直阻塞,
    //直到调⽤用listener.unsubscribe⽅方法
    Thread subThread = new Thread(new Runnable() {
        @Override
        public void run() {
            try{
                SubClient subClient = new SubClient(host,port);
                System.out.println("----------subscribe operation begin-------");
                JedisPubSub listener = new PrintListener();
                //在API级别,此处为轮询操作,直到unsubscribe调⽤用,才会返回
                subClient.sub(listener, channel);
                System.out.println("----------subscribe operation end-------")
                ;
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    });
    subThread.start();
    int i=0;
    while(i < 10){
        String message = RandomStringUtils.random(6, true, true);//apache-commons
        pubClient.pub(channel, message);
        i++;
        Thread.sleep(1000);
    }
    //被动关闭指示,如果通道中,消息发布者确定通道需要关闭,那么就发送一个“quit”
    //那么在listener.onMessage()中接收到“quit”时,其他订阅client将执行“unsubscribe”操作。
    pubClient.close(channel);
    //此外,你还可以这样取消订阅
    //listener.unsubscribe(channel);
    }
}输出:
----------subscribe operation begin-------
message receive:erRIEe,channel:pubsub-channel...2016-03-15 15:53:52
message receive:Ovcwiw,channel:pubsub-channel...2016-03-15 15:53:53
message receive:STPWfV,channel:pubsub-channel...2016-03-15 15:53:54
message receive:SR4iIk,channel:pubsub-channel...2016-03-15 15:53:55
message receive:GI3Ege,channel:pubsub-channel...2016-03-15 15:53:56
message receive:0V1JUt,channel:pubsub-channel...2016-03-15 15:53:57
message receive:3iU8BV,channel:pubsub-channel...2016-03-15 15:53:58
message receive:BqeI2x,channel:pubsub-channel...2016-03-15 15:53:59
message receive:D53cHF,channel:pubsub-channel...2016-03-15 15:54:00
message receive:quit,channel:pubsub-channel...2016-03-15 15:54:01
----------subscribe operation end-------