1
from redis import Redis
if __name__ == '__main__':
client = Redis(host='x.x.x.x', port=6666, decode_responses=True)
try:
# 添加元素
print(client.sadd('s1', 1, 2, 3)) # 3
print(client.sadd('s1', 3)) # 0
print(client.sadd('s1', 2, 3, 4, 5)) # 2
# tmp = [2, 3, 4, 5]
# client.sadd('s1', *tmp) 这样也可以
# 查看元素
print(client.smembers('s1')) # {'3', '1', '5', '4', '2'}
# 集合大小
print(client.scard('s1')) # 5
# 是否包含元素
print(client.sismember('s1', 9)) # False
print(client.sismember('s1', 1)) # True
# 删除元素
print(client.srem('s1', 4, 5)) # 2
print(client.smembers('s1')) # {'1', '2', '3'}
# 随机挑几个元素
print(client.srandmember('s1')) # 2
print(client.srandmember('s1', 2)) # ['3', '1']
print(client.srandmember('s1', 4)) # ['1', '2', '3']
# 随机pop几个
print(client.spop('s1')) # 1
print(client.spop('s1', 4)) # ['2', '3']
print(client.exists('s1')) # 0
# 删除所有数据
print(client.flushall()) # True
client.sadd('s1', 1, 2, 3)
client.sadd('s2', 2, 3, 4)
client.sadd('s3', 2, 5, 6)
# 交集
print(client.sinter('s1', 's2')) # {'3', '2'}
print(client.sinter('s1', 's2', 's3')) # {'2'}
# 差集
print(client.sdiff('s1', 's2')) # {'1'}
# 并集
print(client.sunion('s1', 's2')) # {'1', '4', '2', '3'}
# sunionstore / sdiffstore / sinterstore
print(client.sunionstore('s4', 's1', 's2', 's3')) # 6
print(client.smembers('s4')) # {'3', '1', '5', '4', '6', '2'}
# 移动元素
print(client.smembers('s1')) # {'1', '2', '3'}
print(client.smembers('s2')) # {'3', '4', '2'}
print(client.smove('s1', 's2', 1)) # True
print(client.smembers('s1')) # {'3', '2'}
print(client.smembers('s2')) # {'1', '4', '2', '3'}
except Exception as e:
print(e)
finally:
client.close()
2
class ConnectRedis:
def __init__(self):
"""
初始化函数, 分别初始化 redis连接以及 url字典和title字典
"""
self.Redis_LOCAL = {
'host': '127.0.0.1',
'port': 6379,
'db': 1,
'max_connections': 128, # 最大连接数
'decode_responses': True, # 编码为utf-8
}
self.redis = '' # redis对象
self.gather_name = "qdp_user_info"
self.conn_pool = redis.ConnectionPool(**self.Redis_LOCAL) # 连接池
def get_redis(self):
# 获去一个redis连接对象
while True:
try:
redis1 = redis.Redis(connection_pool=self.conn_pool)
redis1.ping()
except Exception as e:
err = traceback.format_exc()
logger.error(err)
time.sleep(10)
self.conn_pool = redis.ConnectionPool(**self.Redis_LOCAL) # 出问题重新生成一下redis池
else: # 执行没有问题
self.redis = redis1
return
def redis_decorator(func):
"""
redis对象生成装饰器, 不用这个装饰器也可以,只要在每个用到redis的
代码中调用一下self.get_redis() 就可以了
"""
def b(self, *args, **kwargs):
self.get_redis()
return func(self, *args, **kwargs)
return b
@redis_decorator
def exist_title(self, title):
"""
判断 title的值是否存在于集合中
"""
return self.redis.sismember(self.gather_name, title)
@redis_decorator
def add_title(self, title):
"""
在字典中添加一个 title值
"""
return self.redis.sadd(self.gather_name, title)