跳到主要内容

对象池模式

前言

对象池(object pool pattern)是一种非经典GoF的创建型设计模式。一个对象池包含一组已经初始化过且可以使用的对象,可以在有需求时创建和销毁对象。池的对象可以从池子中取得对象,对其进行操作处理,并在不需要时归还给池子而非直接销毁。

如果实例化的成本高,而且需要经常实例化,但每次实例化的数量较少的情况下,使用对象池可以获得显著的效能提升。

优点

  • 提高性能:避免频繁创建和销毁对象,尤其是那些创建代价高的对象(如数据库连接、线程、网络连接等)
  • 降低内存分配影响:减少GC(垃圾回收)影响,尤其是在高并发场景下
  • 资源管理可控:可限制系统使用的资源上限,防止资源耗尽

缺点

  • 增加复杂性:需要额外维护对象池,而且操作完全依赖客户端,如果客户端忘了归还,那么其他组件在需要时将无法使用该对象

示例代码

Go

package objectpool

import (
"errors"
"log"
"math/rand"
"sync"
"time"
)

const getResMaxTime = 3 * time.Second

var (
ErrPoolNotExist = errors.New("Pool not exist")
ErrGetResTimeout = errors.New("get resource timeout")
)

type Resource struct {
resId int
}

func NewResource(id int) *Resource {
time.Sleep(500 * time.Millisecond)
return &Resource{resId: id}
}

func (r *Resource) Do(workId int) {
time.Sleep(time.Duration(rand.Intn(5)) * 100 * time.Millisecond)
log.Printf("using resource #%d for work #%d", r.resId, workId)
}

type Pool chan *Resource

func New(size int) Pool {
p := make(Pool, size)
wg := new(sync.WaitGroup)
wg.Add(size)
for i := range size {
go func(resId int) {
p <- NewResource(resId)
wg.Done()
}(i)
}
wg.Wait()
return p
}

func (p Pool) Acquire() (r *Resource, err error) {
select {
case r := <-p:
return r, nil
case <-time.After(getResMaxTime):
return nil, ErrGetResTimeout
}
}

func (p Pool) Release(r *Resource) error {
if p == nil {
return ErrPoolNotExist
}
p <- r
return nil
}

client

package objectpool

import (
"log"
"sync"
"testing"
)

func TestObjectPool(t *testing.T) {
size := 5
p := New(size)

doWork := func(workId int, wg *sync.WaitGroup) {
defer wg.Done()

res, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
defer p.Release(res)
res.Do(workId)
}

num := 100
wg := &sync.WaitGroup{}
wg.Add(num)
for i := 0; i < num; i++ {
go doWork(i, wg)
}
wg.Wait()
}

Python

from queue import Queue, Empty
import time
import threading
from random import randint

get_res_max_time = 3

class ErrPoolNotExist(Exception):
pass

class ErrGetResTimeout(Exception):
pass

class Resource:
def __init__(self, id: int):
self.id = id
time.sleep(0.5)

def do(self, workid: int):
time.sleep(randint(1, 5) * 0.1)
print(f"using resource #{self.id} for work #{workid}")

class Pool:
def __init__(self, size: int):
self._queue = Queue(maxsize=size)
self._init_threads(size)

def _init_threads(self, size: int):
def create_resource(res_id: int):
resource = Resource(res_id)
self._queue.put(resource)

threads = []
for i in range(size):
t = threading.Thread(target=create_resource, args=(i,))
t.start()
threads.append(t)

for t in threads:
t.join()

def acquire(self) -> Resource:
try:
resource = self._queue.get(timeout=get_res_max_time)
return resource
except Empty:
raise ErrGetResTimeout("Get resource timeout")

def release(self, resource: Resource):
if not self.is_valid():
raise ErrPoolNotExist("Pool is not exist")
self._queue.put(resource)

def is_valid(self) -> bool:
return self._queue is not None

class ResourceContext:
def __init__(self, pool: Pool, resource: Resource):
self.pool = pool
self.resource = resource

def __enter__(self):
return self.resource

def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.release(self.resource)
return False


if __name__ == "__main__":
pool = Pool(size=5)
workers = []

# def do_work(work_id: int, pool: Pool):
# try:
# resource = pool.acquire()
# resource.do(work_id)
# pool.release(resource)
# except ErrPoolNotExist:
# print(f"Pool is not exist")
# except ErrGetResTimeout:
# print(f"Get resource timeout")

def do_work(work_id: int, pool: Pool):
try:
with ResourceContext(pool, pool.acquire()) as resource:
resource.do(work_id)
except ErrPoolNotExist:
print(f"Pool is not exist")
except ErrGetResTimeout:
print(f"Get resource timeout")

for i in range(100):
t = threading.Thread(target=do_work, args=(i, pool))
t.start()
workers.append(t)

for i in workers:
i.join()