【Redis】Redis 客户端连接与编程实践——Python/Java/Node.js 连接 Redis、实现计数器、缓存接口

【Redis】Redis 客户端连接与编程实践——Python/Java/Node.js 连接 Redis、实现计数器、缓存接口

Redis 客户端连接与编程实践 💻

引言 🎯

哈喽各位码友们!老曹今天要带大家进入 Redis 编程的精彩世界!很多小伙伴都会问:“Redis 命令行我会用了,但怎么在程序里用呢?” 别急,今天老曹就手把手教你如何在各种编程语言中优雅地使用 Redis!

🎯 学习目标:

  • 掌握主流语言的 Redis 客户端使用
  • 学会实现常见的业务场景
  • 理解连接池和性能优化
  • 避免编程中的常见坑

1️⃣ Python 客户端实战 🐍

1.1 redis-py 基础使用 🔧

import redis import json # 基础连接 r = redis.Redis( host='localhost', port=6379, db=0, password='your_password', decode_responses=True# 自动解码bytes为str)# 字符串操作 r.set('username','老曹') name = r.get('username')print(f"用户名: {name}")# 数字操作 r.set('counter',0) r.incr('counter')# 自增 r.incrby('counter',5)# 增加指定值 count = r.get('counter')print(f"计数器: {count}")

1.2 连接池配置 ⚡

# 创建连接池 pool = redis.ConnectionPool( host='localhost', port=6379, db=0, password='your_password', max_connections=20, retry_on_timeout=True, socket_keepalive=True, health_check_interval=30)# 使用连接池 r = redis.Redis(connection_pool=pool)# 批量操作示例defbatch_set_users(users_data): pipe = r.pipeline()for user_id, user_info in users_data.items(): key =f"user:{user_id}" pipe.hset(key, mapping=user_info) pipe.expire(key,3600)# 1小时过期 pipe.execute()# 使用示例 users ={'1001':{'name':'张三','age':25,'city':'北京'},'1002':{'name':'李四','age':30,'city':'上海'}} batch_set_users(users)

1.3 实战案例:用户登录系统 🔐

import hashlib import time from datetime import datetime, timedelta classUserAuthSystem:def__init__(self, redis_client): self.r = redis_client self.session_prefix ="session:" self.user_prefix ="user:"defregister_user(self, username, password, email):"""用户注册"""# 检查用户名是否已存在if self.r.exists(f"{self.user_prefix}{username}"):returnFalse,"用户名已存在"# 密码加密存储 salt = hashlib.sha256(str(time.time()).encode()).hexdigest()[:8] hashed_password = hashlib.sha256((password + salt).encode()).hexdigest() user_data ={'username': username,'password': hashed_password,'salt': salt,'email': email,'created_at':str(datetime.now()),'login_count':0} self.r.hset(f"{self.user_prefix}{username}", mapping=user_data)returnTrue,"注册成功"deflogin(self, username, password):"""用户登录""" user_key =f"{self.user_prefix}{username}" user_data = self.r.hgetall(user_key)ifnot user_data:returnFalse,"用户不存在"# 验证密码 salt = user_data['salt'] hashed_input = hashlib.sha256((password + salt).encode()).hexdigest()if hashed_input != user_data['password']:returnFalse,"密码错误"# 创建会话 session_id = hashlib.md5(f"{username}{time.time()}".encode()).hexdigest() session_data ={'username': username,'login_time':str(datetime.now()),'expires_at':str(datetime.now()+ timedelta(hours=2))} session_key =f"{self.session_prefix}{session_id}" self.r.hset(session_key, mapping=session_data) self.r.expire(session_key,7200)# 2小时过期# 更新登录次数 self.r.hincrby(user_key,'login_count',1)returnTrue, session_id defvalidate_session(self, session_id):"""验证会话有效性""" session_key =f"{self.session_prefix}{session_id}" session_data = self.r.hgetall(session_key)ifnot session_data:returnFalse,"会话不存在"# 检查是否过期 expires_at = datetime.fromisoformat(session_data['expires_at'])if datetime.now()> expires_at: self.r.delete(session_key)returnFalse,"会话已过期"returnTrue, session_data['username']deflogout(self, session_id):"""用户登出""" session_key =f"{self.session_prefix}{session_id}"returnbool(self.r.delete(session_key))# 使用示例if __name__ =="__main__": r = redis.Redis(decode_responses=True) auth_system = UserAuthSystem(r)# 注册用户 success, message = auth_system.register_user("laocao","123456","[email protected]")print(f"注册结果: {message}")# 用户登录 success, result = auth_system.login("laocao","123456")if success: session_id = result print(f"登录成功,Session ID: {session_id}")# 验证会话 valid, username = auth_system.validate_session(session_id)print(f"会话验证: {'有效'if valid else'无效'}")# 登出 auth_system.logout(session_id)print("已登出")

2️⃣ Java 客户端实战 ☕

2.1 Jedis 基础使用 🔧

importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.JedisPoolConfig;publicclassRedisJedisExample{// 连接池配置privatestaticJedisPool jedisPool;static{JedisPoolConfig config =newJedisPoolConfig(); config.setMaxTotal(20); config.setMaxIdle(10); config.setMinIdle(5); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); jedisPool =newJedisPool(config,"localhost",6379,2000,"your_password");}publicstaticvoidmain(String[] args){// 基础操作示例try(Jedis jedis = jedisPool.getResource()){// 字符串操作 jedis.set("greeting","Hello Redis!");String greeting = jedis.get("greeting");System.out.println("Greeting: "+ greeting);// 数字操作 jedis.set("counter","0"); jedis.incr("counter"); jedis.incrBy("counter",5);Long counter = jedis.incrBy("counter",3);System.out.println("Counter: "+ counter);// Hash操作 jedis.hset("user:1001","name","老曹"); jedis.hset("user:1001","age","18");String name = jedis.hget("user:1001","name");System.out.println("User name: "+ name);}catch(Exception e){ e.printStackTrace();}}}

2.2 Lettuce 高级使用 🚀

importio.lettuce.core.*;importio.lettuce.core.api.StatefulRedisConnection;importio.lettuce.core.api.sync.RedisCommands;importio.lettuce.core.support.ConnectionPoolSupport;importorg.apache.commons.pool2.impl.GenericObjectPool;importorg.apache.commons.pool2.impl.GenericObjectPoolConfig;importjava.time.Duration;importjava.util.HashMap;importjava.util.Map;publicclassRedisLettuceExample{privatestaticGenericObjectPool<StatefulRedisConnection<String,String>> pool;static{// Redis客户端配置RedisURI redisUri =RedisURI.Builder.redis("localhost",6379).withPassword("your_password").withDatabase(0).build();RedisClient client =RedisClient.create(redisUri);// 连接池配置GenericObjectPoolConfig<StatefulRedisConnection<String,String>> poolConfig =newGenericObjectPoolConfig<>(); poolConfig.setMaxTotal(20); poolConfig.setMaxIdle(10); poolConfig.setMinIdle(5); poolConfig.setTestOnBorrow(true); pool =ConnectionPoolSupport.createGenericObjectPool(()-> client.connect(), poolConfig);}publicstaticvoidmain(String[] args){try{StatefulRedisConnection<String,String> connection = pool.borrowObject();RedisCommands<String,String> sync = connection.sync();// 批量操作Map<String,String> userData =newHashMap<>(); userData.put("name","老曹"); userData.put("age","18"); userData.put("city","北京"); sync.hmset("user:1002", userData);// 获取所有字段Map<String,String> result = sync.hgetall("user:1002");System.out.println("User data: "+ result);// 管道操作 sync.setAutoFlushCommands(false);for(int i =0; i <1000; i++){ sync.set("key:"+ i,"value:"+ i);} sync.flushCommands(); sync.setAutoFlushCommands(true); pool.returnObject(connection);}catch(Exception e){ e.printStackTrace();}}}

2.3 实战案例:分布式计数器 📊

importredis.clients.jedis.Jedis;importredis.clients.jedis.JedisPool;importredis.clients.jedis.Transaction;publicclassDistributedCounter{privateJedisPool jedisPool;privateString counterKey;publicDistributedCounter(JedisPool pool,String key){this.jedisPool = pool;this.counterKey = key;}/** * 原子性递增计数器 */publiclongincrement(){try(Jedis jedis = jedisPool.getResource()){return jedis.incr(counterKey);}}/** * 带过期时间的递增 */publiclongincrementWithExpire(int expireSeconds){try(Jedis jedis = jedisPool.getResource()){Transaction tx = jedis.multi(); tx.incr(counterKey); tx.expire(counterKey, expireSeconds); tx.exec();return jedis.get(counterKey)==null?0:Long.parseLong(jedis.get(counterKey));}}/** * 获取当前计数值 */publiclonggetCurrentValue(){try(Jedis jedis = jedisPool.getResource()){String value = jedis.get(counterKey);return value ==null?0:Long.parseLong(value);}}/** * 重置计数器 */publicvoidreset(){try(Jedis jedis = jedisPool.getResource()){ jedis.del(counterKey);}}/** * 带限流的计数器 */publicbooleanincrementWithLimit(long limit){try(Jedis jedis = jedisPool.getResource()){long current = jedis.incr(counterKey);if(current ==1){// 第一次设置过期时间 jedis.expire(counterKey,60);// 60秒窗口}return current <= limit;}}// 使用示例publicstaticvoidmain(String[] args)throwsInterruptedException{JedisPool pool =newJedisPool("localhost",6379);DistributedCounter counter =newDistributedCounter(pool,"api_request_count");// 模拟API请求计数for(int i =0; i <10; i++){boolean allowed = counter.incrementWithLimit(5);System.out.println("Request "+(i+1)+": "+(allowed ?"Allowed":"Rate Limited"));Thread.sleep(100);} pool.close();}}

3️⃣ Node.js 客户端实战 🟩

3.1 ioredis 基础使用 🔧

const Redis =require('ioredis');// 基础连接const redis =newRedis({port:6379,host:'localhost',password:'your_password',db:0,retryDelayOnFailover:300,showFriendlyErrorStack:true});// Promise 方式使用asyncfunctionbasicOperations(){try{// 字符串操作await redis.set('username','老曹');const name =await redis.get('username'); console.log('用户名:', name);// 数字操作await redis.set('counter',0);await redis.incr('counter');await redis.incrby('counter',5);const count =await redis.get('counter'); console.log('计数器:', count);// Hash操作await redis.hset('user:1001','name','老曹','age','18');const userInfo =await redis.hgetall('user:1001'); console.log('用户信息:', userInfo);}catch(error){ console.error('Redis操作失败:', error);}}// 管道操作asyncfunctionpipelineDemo(){const pipeline = redis.pipeline();// 批量设置for(let i =0; i <100; i++){ pipeline.set(`key:${i}`,`value:${i}`);}// 执行管道const results =await pipeline.exec(); console.log('管道执行完成,影响条数:', results.length);}// 发布订阅functionpubSubDemo(){// 订阅者const subscriber =newRedis(); subscriber.subscribe('news'); subscriber.on('message',(channel, message)=>{ console.log(`收到频道 ${channel} 的消息:`, message);});// 发布者setTimeout(()=>{ redis.publish('news','Hello Redis Pub/Sub!');},1000);}

3.2 连接池和集群配置 ⚡

const Redis =require('ioredis');// 连接池配置const cluster =newRedis.Cluster([{port:7000,host:'127.0.0.1'},{port:7001,host:'127.0.0.1'}],{redisOptions:{password:'cluster_password'},scaleReads:'slave',// 读操作分流到从节点enableOfflineQueue:true,retryDelayOnFailover:300});// 连接事件处理 cluster.on('connect',()=>{ console.log('Redis集群连接成功');}); cluster.on('error',(err)=>{ console.error('Redis集群连接错误:', err);});// 健康检查setInterval(async()=>{try{const pong =await cluster.ping(); console.log('Redis集群健康检查:', pong);}catch(error){ console.error('健康检查失败:', error);}},30000);

3.3 实战案例:缓存装饰器 🎨

const Redis =require('ioredis');const crypto =require('crypto');classCacheDecorator{constructor(redisClient, defaultTTL =3600){this.redis = redisClient;this.defaultTTL = defaultTTL;}// 生成缓存键generateCacheKey(funcName,...args){const argsString =JSON.stringify(args);const hash = crypto.createHash('md5').update(argsString).digest('hex');return`cache:${funcName}:${hash}`;}// 缓存装饰器cache(ttl =this.defaultTTL){return(target, propertyName, descriptor)=>{const method = descriptor.value; descriptor.value=asyncfunction(...args){const cacheKey =this.generateCacheKey(propertyName,...args);// 尝试从缓存获取try{const cached =awaitthis.redis.get(cacheKey);if(cached){ console.log(`缓存命中: ${cacheKey}`);returnJSON.parse(cached);}}catch(error){ console.error('缓存读取失败:', error);}// 执行原方法const result =awaitmethod.apply(this, args);// 存储到缓存try{awaitthis.redis.setex(cacheKey, ttl,JSON.stringify(result)); console.log(`缓存已设置: ${cacheKey}`);}catch(error){ console.error('缓存存储失败:', error);}return result;};return descriptor;};}}// 使用示例classUserService{constructor(){this.redis =newRedis();this.cache =newCacheDecorator(this.redis,1800);// 30分钟缓存} @this.cache.cache(3600)// 1小时缓存asyncgetUserById(userId){ console.log(`从数据库查询用户: ${userId}`);// 模拟数据库查询awaitnewPromise(resolve=>setTimeout(resolve,100));return{id: userId,name:`用户${userId}`,email:`user${userId}@example.com`,createdAt:newDate()};} @this.cache.cache(1800)// 30分钟缓存asyncgetUserPosts(userId){ console.log(`查询用户${userId}的文章`);// 模拟数据库查询awaitnewPromise(resolve=>setTimeout(resolve,200));return[{id:1,title:'文章1',content:'内容1'},{id:2,title:'文章2',content:'内容2'}];}}// 测试缓存效果asyncfunctiontestCache(){const userService =newUserService(); console.log('=== 第一次调用 ===');await userService.getUserById(123);await userService.getUserPosts(123); console.log('\n=== 第二次调用(应该命中缓存) ===');await userService.getUserById(123);await userService.getUserPosts(123);// 清理 process.exit(0);}

4️⃣ Go 客户端实战 🐹

4.1 go-redis 基础使用 🔧

package main import("context""fmt""github.com/go-redis/redis/v8""time")var ctx = context.Background()funcmain(){// 创建客户端 rdb := redis.NewClient(&redis.Options{ Addr:"localhost:6379", Password:"your_password", DB:0,})// 测试连接 pong, err := rdb.Ping(ctx).Result()if err !=nil{panic(err)} fmt.Println("Redis连接成功:", pong)// 基础操作basicOperations(rdb)// 管道操作pipelineOperations(rdb)// 事务操作transactionExample(rdb)}funcbasicOperations(rdb *redis.Client){// 字符串操作 err := rdb.Set(ctx,"username","老曹",0).Err()if err !=nil{panic(err)} val, err := rdb.Get(ctx,"username").Result()if err !=nil{panic(err)} fmt.Println("用户名:", val)// 数字操作 rdb.Set(ctx,"counter",0,0) rdb.Incr(ctx,"counter") rdb.IncrBy(ctx,"counter",5) count, err := rdb.Get(ctx,"counter").Int64()if err !=nil{panic(err)} fmt.Println("计数器:", count)// Hash操作 rdb.HSet(ctx,"user:1001",map[string]interface{}{"name":"老曹","age":18,"city":"北京",}) userInfo, err := rdb.HGetAll(ctx,"user:1001").Result()if err !=nil{panic(err)} fmt.Println("用户信息:", userInfo)}funcpipelineOperations(rdb *redis.Client){ pipe := rdb.TxPipeline()// 批量操作for i :=0; i <100; i++{ pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i),0)}// 执行管道 cmds, err := pipe.Exec(ctx)if err !=nil{panic(err)} fmt.Printf("管道执行完成,命令数: %d\n",len(cmds))}functransactionExample(rdb *redis.Client){// 乐观锁事务 err := rdb.Watch(ctx,func(tx *redis.Tx)error{// 获取当前值 n, err := tx.Get(ctx,"counter").Int()if err !=nil&& err != redis.Nil {return err }// 执行事务_, err = tx.TxPipelined(ctx,func(pipe redis.Pipeliner)error{ pipe.Set(ctx,"counter", n+1,0)returnnil})return err },"counter")if err !=nil{panic(err)} newVal,_:= rdb.Get(ctx,"counter").Result() fmt.Println("事务后计数器:", newVal)}

4.2 实战案例:分布式锁 🔐

package main import("context""fmt""github.com/go-redis/redis/v8""math/rand""time")type DistributedLock struct{ client *redis.Client key string value string expiry time.Duration }funcNewDistributedLock(client *redis.Client, key string, expiry time.Duration)*DistributedLock {return&DistributedLock{ client: client, key:"lock:"+ key, value: fmt.Sprintf("%d", rand.Int63()), expiry: expiry,}}func(dl *DistributedLock)Acquire(ctx context.Context)(bool,error){// 尝试获取锁 success, err := dl.client.SetNX(ctx, dl.key, dl.value, dl.expiry).Result()if err !=nil{returnfalse, err }if success {// 启动续期协程go dl.renew(ctx)}return success,nil}func(dl *DistributedLock)Release(ctx context.Context)error{// 使用Lua脚本安全释放锁 script :=` if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end `return dl.client.Eval(ctx, script,[]string{dl.key}, dl.value).Err()}func(dl *DistributedLock)renew(ctx context.Context){ ticker := time.NewTicker(dl.expiry /3)defer ticker.Stop()for{select{case<-ticker.C:// 续期锁 dl.client.Expire(ctx, dl.key, dl.expiry)case<-ctx.Done():return}}}// 使用示例funcmain(){ rdb := redis.NewClient(&redis.Options{ Addr:"localhost:6379",})// 模拟并发场景for i :=0; i <5; i++{gofunc(id int){ lock :=NewDistributedLock(rdb,"critical_resource", time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)defercancel() acquired, err := lock.Acquire(ctx)if err !=nil{ fmt.Printf("协程%d获取锁失败: %v\n", id, err)return}if acquired { fmt.Printf("协程%d获取锁成功\n", id)// 模拟业务处理 time.Sleep(time.Second *3) lock.Release(ctx) fmt.Printf("协程%d释放锁\n", id)}else{ fmt.Printf("协程%d获取锁失败\n", id)}}(i)} time.Sleep(time.Second *20)}

5️⃣ 10大面试高频问题解答 🎓

问题1:如何选择合适的Redis客户端?

答案:

  • Python: redis-py (官方推荐)
  • Java: Lettuce (Spring Data Redis默认)
  • Node.js: ioredis (功能最全)
  • Go: go-redis/v8 (性能优秀)

问题2:连接池大小如何设置?

答案:

# 一般规则:CPU核心数 × 2 + 有效磁盘数 max_connections =(cpu_cores *2)+ disk_count # 通常设置在10-50之间比较合适

问题3:Pipeline和Transaction的区别?

答案:

  • Pipeline: 批量发送命令,减少网络往返
  • Transaction: 保证原子性,要么全部执行要么全部不执行

问题4:如何处理Redis连接异常?

答案:

# 重试机制import time from redis.exceptions import ConnectionError defsafe_redis_operation(func, max_retries=3):for attempt inrange(max_retries):try:return func()except ConnectionError:if attempt == max_retries -1:raise time.sleep(2** attempt)# 指数退避

问题5:Redis客户端如何做健康检查?

答案:

// 定期PING检查ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(()->{try{String pong = jedis.ping();if(!"PONG".equals(pong)){// 健康检查失败,重新连接reconnect();}}catch(Exception e){// 处理连接异常handleConnectionError(e);}},0,30,TimeUnit.SECONDS);

问题6:如何实现缓存穿透防护?

答案:

defget_user_with_cache_protection(user_id): cache_key =f"user:{user_id}"# 先查缓存 user_data = redis.get(cache_key)if user_data isnotNone:return json.loads(user_data)if user_data !="NULL"elseNone# 缓存未命中,查数据库 user_data = db.get_user(user_id)# 缓存结果(即使是空值也要缓存)if user_data: redis.setex(cache_key,3600, json.dumps(user_data))else:# 空值缓存,防止穿透 redis.setex(cache_key,300,"NULL")return user_data 

问题7:Redis客户端如何做负载均衡?

答案:

  • 客户端分片:根据key的hash值选择不同的Redis实例
  • 代理模式:使用Twemproxy或Codis等代理
  • 集群模式:Redis Cluster自动分片

问题8:如何监控Redis客户端性能?

答案:

import time from functools import wraps defmonitor_redis(func):@wraps(func)defwrapper(*args,**kwargs): start_time = time.time()try: result = func(*args,**kwargs) duration =(time.time()- start_time)*1000print(f"Redis操作 {func.__name__}: {duration:.2f}ms")return result except Exception as e: duration =(time.time()- start_time)*1000print(f"Redis操作 {func.__name__} 失败: {duration:.2f}ms, 错误: {e}")raisereturn wrapper @monitor_redisdefget_user_data(user_id):return redis.get(f"user:{user_id}")

问题9:Redis客户端连接超时如何设置?

答案:

// Node.js示例const redis =newRedis({connectTimeout:10000,// 连接超时10秒commandTimeout:5000,// 命令超时5秒retryDelayOnFailover:300// 故障转移重试延迟});

问题10:如何处理Redis大key问题?

答案:

# 分批处理大keydefscan_large_hash(key, pattern="*", count=1000): cursor =0whileTrue: cursor, keys = redis.hscan(key, cursor,match=pattern, count=count)# 处理一批keys process_keys(keys)if cursor ==0:break# 渐进式删除大keydefdelete_large_key(key):# 使用UNLINK异步删除 redis.unlink(key)

6️⃣ 最佳实践总结 📋

6.1 连接管理最佳实践 ⭐

实践项推荐做法原因
连接池必须使用避免频繁创建销毁连接
超时设置合理配置防止长时间阻塞
重试机制实现指数退避提高容错能力
健康检查定期PING及时发现连接问题

6.2 性能优化建议 🚀

优化项方法效果
批量操作使用Pipeline减少网络RTT
连接复用连接池管理降低连接开销
序列化优化选择高效格式减少传输大小
异步处理非阻塞IO提高并发性能

6.3 错误处理策略 ⚠️

# 完整的错误处理框架classRedisClientWrapper:def__init__(self, redis_client): self.redis = redis_client self.retry_delays =[0.1,0.5,1.0,2.0]# 重试延迟策略defexecute_with_retry(self, operation,*args,**kwargs):"""带重试的Redis操作""" last_exception =Nonefor attempt, delay inenumerate(self.retry_delays):try:return operation(*args,**kwargs)except(ConnectionError, TimeoutError)as e: last_exception = e if attempt <len(self.retry_delays)-1: time.sleep(delay)continueelse:# 记录监控指标 self.record_error(operation.__name__,str(e))raiseraise last_exception defrecord_error(self, operation_name, error_message):"""记录错误到监控系统"""# 发送到Prometheus、ELK等监控系统pass

结语 🎉

老曹今天的编程实战够干货吧!记住几个核心要点:

选择合适的客户端 - 根据语言特性和项目需求
善用连接池 - 这是性能的关键
异常处理要做好 - 生产环境稳定性很重要
监控不能少 - 知道系统运行状况才能及时优化

下节我们聊聊 Redis 的持久化机制,那可是数据安全的重要保障!记得关注老曹,技术路上一起飞!🚀


“代码如诗,优雅永不过时” - 老曹技术感悟

Read more

零基础学AI大模型之Milvus实战:Attu可视化安装+Python整合全案例

零基础学AI大模型之Milvus实战:Attu可视化安装+Python整合全案例

大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注实战代码系列最新文章😉C++实现图书管理系统(Qt C++ GUI界面版)SpringBoot实战系列🐷【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案分库分表分库分表之实战-sharding-JDBC分库分表执行流程原理剖析消息队列深入浅出 RabbitMQ-RabbitMQ消息确认机制(ACK)AI大模型零基础学AI大模型之Milvus部署架构选型+Linux实战:Docker一键部署+WebUI使用 前情摘要 1、零基础学AI大模型之读懂AI大模型 2、零基础学AI大模型之从0到1调用大模型API 3、零基础学AI大模型之SpringAI 4、零基础学AI大模型之AI大模型常见概念 5、零基础学AI大模型之大模型私有化部署全指南 6、零基础学AI大模型之AI大模型可视化界面 7、零基础学AI大模型之LangChain 8、零基础学AI大模型之LangChain六大核心模块与大模型IO交互链路 9、零基础学AI大模型之Prompt提示词工程 10、零基础

By Ne0inhk

Python字节码逆向神器pycdc:从入门到精通的完整指南

Python字节码逆向神器pycdc:从入门到精通的完整指南 【免费下载链接】pycdcC++ python bytecode disassembler and decompiler 项目地址: https://gitcode.com/GitHub_Trending/py/pycdc 你是否遇到过需要分析已编译的Python字节码文件,却无法获取源代码的困境?pycdc作为一款强大的Python字节码反汇编器和反编译器,能够将Python字节码逆向还原为可读的源代码,支持从Python 1.0到3.13的全版本字节码解析。🎯 工具核心功能详解 pycdc包含两个核心组件:pycdas(反汇编器) 和 pycdc(反编译器)。与其他逆向工具相比,它的独特优势在于: * 全版本兼容:覆盖Python 1.0至3.13的所有版本 * 双工具链设计:既可生成字节码指令流,也能直接输出源代码 * 高精度还原:通过抽象语法树(AST)技术确保代码准确性 项目通过ASTNode.h和ASTree.cpp实现语法树构建,字节码处理逻辑位于bytecode.

By Ne0inhk
Python 驱动浏览器自动化:Playwright + AI 的 2026 最佳实践

Python 驱动浏览器自动化:Playwright + AI 的 2026 最佳实践

摘要:在 Web 自动化领域,Selenium 曾经的霸主地位已成历史,Playwright 凭其“快、稳、强”的现代特性成为了新标准。而在 2026 年,随着 LLM(大语言模型)和视觉多模态模型的爆发,自动化测试与 RPA(机器人流程自动化)迎来了范式革命。本文将深度解析 Playwright 的核心架构,并手把手教你构建一个具备“自愈能力”的 AI 驱动自动化 Agent。本文超 7000 字,包含大量实战代码与反爬对抗技巧。 第一章:Selenium 已死,Playwright 当立? 1.1 自动化的“不可能三角” 长期以来,Web 自动化工程师都在速度、稳定性和抗检测性之间做取舍: * Selenium:

By Ne0inhk
【C++经典例题】回文串判断:两种高效解法剖析

【C++经典例题】回文串判断:两种高效解法剖析

💓 博客主页:倔强的石头的ZEEKLOG主页             📝Gitee主页:倔强的石头的gitee主页             ⏩ 文章专栏:C++经典例题                                   期待您的关注 目录 一、问题描述 示例 二、解法一:将字母数字连接到新的 string 思路 代码实现 代码解释 复杂度分析 三、解法二:直接原地筛选判断 思路 代码实现 代码解释 复杂度分析 总结 一、问题描述 在字符串处理中,判断一个字符串是否为回文串是一个经典问题。 本题有特殊要求:在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,如果短语正着读和反着读都一样,则认为该短语是一个回文串。字母和数字都属于字母数字字符。 示例 * 输入: s = "A man, a plan, a canal: Panama"

By Ne0inhk