python操作redis set
python操作redis set

python操作redis set

1

from redis import Redis

if __name__ == '__main__':
    client = Redis(host='x.x.x.x', port=6666, decode_responses=True)
    try:
        # 添加元素
        print(client.sadd('s1', 1, 2, 3))  # 3
        print(client.sadd('s1', 3))  # 0
        print(client.sadd('s1', 2, 3, 4, 5))  # 2
        # tmp = [2, 3, 4, 5]
        # client.sadd('s1', *tmp) 这样也可以

        # 查看元素
        print(client.smembers('s1'))  # {'3', '1', '5', '4', '2'}

        # 集合大小
        print(client.scard('s1'))  # 5

        # 是否包含元素
        print(client.sismember('s1', 9))  # False
        print(client.sismember('s1', 1))  # True

        # 删除元素
        print(client.srem('s1', 4, 5))  # 2
        print(client.smembers('s1'))  # {'1', '2', '3'}

        # 随机挑几个元素
        print(client.srandmember('s1'))  # 2
        print(client.srandmember('s1', 2))  # ['3', '1']
        print(client.srandmember('s1', 4))  # ['1', '2', '3']

        # 随机pop几个
        print(client.spop('s1'))  # 1
        print(client.spop('s1', 4))  # ['2', '3']
        print(client.exists('s1'))  # 0

        # 删除所有数据
        print(client.flushall())  # True
        client.sadd('s1', 1, 2, 3)
        client.sadd('s2', 2, 3, 4)
        client.sadd('s3', 2, 5, 6)

        # 交集
        print(client.sinter('s1', 's2'))  # {'3', '2'}
        print(client.sinter('s1', 's2', 's3'))  # {'2'}

        # 差集
        print(client.sdiff('s1', 's2'))  # {'1'}

        # 并集
        print(client.sunion('s1', 's2'))  # {'1', '4', '2', '3'}

        #  sunionstore / sdiffstore / sinterstore
        print(client.sunionstore('s4', 's1', 's2', 's3'))  # 6
        print(client.smembers('s4'))  # {'3', '1', '5', '4', '6', '2'}

        # 移动元素
        print(client.smembers('s1'))  # {'1', '2', '3'}
        print(client.smembers('s2'))  # {'3', '4', '2'}
        print(client.smove('s1', 's2', 1))  # True
        print(client.smembers('s1'))  # {'3', '2'}
        print(client.smembers('s2'))  # {'1', '4', '2', '3'}
    except Exception as e:
        print(e)
    finally:
        client.close()

2

class ConnectRedis:
    def __init__(self):
        """
        初始化函数, 分别初始化 redis连接以及 url字典和title字典
        """
        self.Redis_LOCAL = {
            'host': '127.0.0.1',
            'port': 6379,
            'db': 1,
            'max_connections': 128,  # 最大连接数
            'decode_responses': True,  # 编码为utf-8
        }
        self.redis = ''  # redis对象
        self.gather_name = "qdp_user_info"
        self.conn_pool = redis.ConnectionPool(**self.Redis_LOCAL)  # 连接池

    def get_redis(self):
        # 获去一个redis连接对象
        while True:
            try:
                redis1 = redis.Redis(connection_pool=self.conn_pool)
                redis1.ping()
            except Exception as e:
                err = traceback.format_exc()
                logger.error(err)
                time.sleep(10)
                self.conn_pool = redis.ConnectionPool(**self.Redis_LOCAL)  # 出问题重新生成一下redis池
            else:  # 执行没有问题
                self.redis = redis1
                return

    def redis_decorator(func):
        """
        redis对象生成装饰器, 不用这个装饰器也可以,只要在每个用到redis的
        代码中调用一下self.get_redis() 就可以了
        """

        def b(self, *args, **kwargs):
            self.get_redis()
            return func(self, *args, **kwargs)

        return b

    @redis_decorator
    def exist_title(self, title):
        """
        判断 title的值是否存在于集合中
        """
        return self.redis.sismember(self.gather_name, title)

    @redis_decorator
    def add_title(self, title):
        """
        在字典中添加一个 title值
        """
        return self.redis.sadd(self.gather_name, title)

发表回复

您的电子邮箱地址不会被公开。