侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Django文件存储 自己定制存储系统解析

框架(架构)  /  管理员 发布于 7年前   145

要自己写一个存储系统,可以依照以下步骤:

1.写一个继承自django.core.files.storage.Storage的子类。

from django.core.files.storage import Storageclass MyStorage(Storage):  ...

2.Django必须可以在无任何参数的情况下实例化MyStorage,所以任何环境设置必须来自django.conf.settings。

from django.conf import settingsfrom django.core.files.storage import Storage class MyStorage(Storage):  def __init__(self, option=None):    if not option:      option = settings.CUSTOM_STORAGE_OPTIONS    ...

3.根据Storage的open和save方法源码:

def open(self, name, mode='rb'):  """  Retrieves the specified file from storage.  """  return self._open(name, mode)  def save(self, name, content, max_length=None):  """  Saves new content to the file specified by name. The content should be  a proper File object or any python file-like object, ready to be read  from the beginning.  """  # Get the proper name for the file, as it will actually be saved.  if name is None:    name = content.name   if not hasattr(content, 'chunks'):    content = File(content, name)   name = self.get_available_name(name, max_length=max_length)  return self._save(name, content)

MyStorage需要实现_open和_save方法。

如果写的是个本地存储系统,还要重写path方法。

4.使用django.utils.deconstruct.deconstructible装饰器,以便在migration可以序列化。

还有,Storage.delete()、Storage.exists()、Storage.listdir()、Storage.size()、Storage.url()方法都会报NotImplementedError,也需要重写。

Django Qiniu Storage

七牛云有自己的django storage系统,可以看下是怎么运作的,地址 https://github.com/glasslion/django-qiniu-storage 。

先在环境变量或者settings中配置QINIU_ACCESS_KEY、QINIU_SECRET_KEY、QINIU_BUCKET_NAME、QINIU_BUCKET_DOMAIN、QINIU_SECURE_URL。

使用七牛云托管用户上传的文件,在 settings.py 里设置DEFAULT_FILE_STORAGE:

DEFAULT_FILE_STORAGE = 'qiniustorage.backends.QiniuStorage'

使用七牛托管动态生成的文件以及站点自身的静态文件,设置:

STATICFILES_STORAGE = 'qiniustorage.backends.QiniuStaticStorage'

运行python manage.py collectstatic,静态文件就会被统一上传到七牛。

QiniuStorage代码如下:

@deconstructibleclass QiniuStorage(Storage):  """  Qiniu Storage Service  """  location = ""   def __init__(      self,      access_key=QINIU_ACCESS_KEY,      secret_key=QINIU_SECRET_KEY,      bucket_name=QINIU_BUCKET_NAME,      bucket_domain=QINIU_BUCKET_DOMAIN,      secure_url=QINIU_SECURE_URL):     self.auth = Auth(access_key, secret_key)    self.bucket_name = bucket_name    self.bucket_domain = bucket_domain    self.bucket_manager = BucketManager(self.auth)    self.secure_url = secure_url   def _clean_name(self, name):    """    Cleans the name so that Windows style paths work    """    # Normalize Windows style paths    clean_name = posixpath.normpath(name).replace('\\', '/')     # os.path.normpath() can strip trailing slashes so we implement    # a workaround here.    if name.endswith('/') and not clean_name.endswith('/'):      # Add a trailing slash as it was stripped.      return clean_name + '/'    else:      return clean_name   def _normalize_name(self, name):    """    Normalizes the name so that paths like /path/to/ignored/../foo.txt    work. We check to make sure that the path pointed to is not outside    the directory specified by the LOCATION setting.    """     base_path = force_text(self.location)    base_path = base_path.rstrip('/')     final_path = urljoin(base_path.rstrip('/') + "/", name)     base_path_len = len(base_path)    if (not final_path.startswith(base_path) or        final_path[base_path_len:base_path_len + 1] not in ('', '/')):      raise SuspiciousOperation("Attempted access to '%s' denied." %       name)    return final_path.lstrip('/')   def _open(self, name, mode='rb'):    return QiniuFile(name, self, mode)   def _save(self, name, content):    cleaned_name = self._clean_name(name)    name = self._normalize_name(cleaned_name)     if hasattr(content, 'chunks'):      content_str = b''.join(chunk for chunk in content.chunks())    else:      content_str = content.read()     self._put_file(name, content_str)    return cleaned_name   def _put_file(self, name, content):    token = self.auth.upload_token(self.bucket_name)    ret, info = put_data(token, name, content)    if ret is None or ret['key'] != name:      raise QiniuError(info)   def _read(self, name):    return requests.get(self.url(name)).content   def delete(self, name):    name = self._normalize_name(self._clean_name(name))    if six.PY2:      name = name.encode('utf-8')    ret, info = self.bucket_manager.delete(self.bucket_name, name)     if ret is None or info.status_code == 612:      raise QiniuError(info)   def _file_stat(self, name, silent=False):    name = self._normalize_name(self._clean_name(name))    if six.PY2:      name = name.encode('utf-8')    ret, info = self.bucket_manager.stat(self.bucket_name, name)    if ret is None and not silent:      raise QiniuError(info)    return ret   def exists(self, name):    stats = self._file_stat(name, silent=True)    return True if stats else False   def size(self, name):    stats = self._file_stat(name)    return stats['fsize']   def modified_time(self, name):    stats = self._file_stat(name)    time_stamp = float(stats['putTime']) / 10000000    return datetime.datetime.fromtimestamp(time_stamp)   def listdir(self, name):    name = self._normalize_name(self._clean_name(name))    if name and not name.endswith('/'):      name += '/'     dirlist = bucket_lister(self.bucket_manager, self.bucket_name,    prefix=name)    files = []    dirs = set()    base_parts = name.split("/")[:-1]    for item in dirlist:      parts = item['key'].split("/")      parts = parts[len(base_parts):]      if len(parts) == 1:        # File        files.append(parts[0])      elif len(parts) > 1:        # Directory        dirs.add(parts[0])    return list(dirs), files   def url(self, name):    name = self._normalize_name(self._clean_name(name))    name = filepath_to_uri(name)    protocol = u'https://' if self.secure_url else u'http://'    return urljoin(protocol + self.bucket_domain, name)

配置是从环境变量或者settings.py中获得的:

def get_qiniu_config(name, default=None):  """  Get configuration variable from environment variable  or django setting.py  """  config = os.environ.get(name, getattr(settings, name, default))  if config is not None:    if isinstance(config, six.string_types):      return config.strip()    else:      return config  else:    raise ImproperlyConfigured(      "Can't find config for '%s' either in environment"      "variable or in setting.py" % name) QINIU_ACCESS_KEY = get_qiniu_config('QINIU_ACCESS_KEY')QINIU_SECRET_KEY = get_qiniu_config('QINIU_SECRET_KEY')QINIU_BUCKET_NAME = get_qiniu_config('QINIU_BUCKET_NAME')QINIU_BUCKET_DOMAIN = get_qiniu_config('QINIU_BUCKET_DOMAIN', '').rstrip('/')QINIU_SECURE_URL = get_qiniu_config('QINIU_SECURE_URL', 'False')

重写了_open和_save方法:

def _open(self, name, mode='rb'):  return QiniuFile(name, self, mode) def _save(self, name, content):  cleaned_name = self._clean_name(name)  name = self._normalize_name(cleaned_name)   if hasattr(content, 'chunks'):    content_str = b''.join(chunk for chunk in content.chunks())  else:    content_str = content.read()   self._put_file(name, content_str)  return cleaned_name

使用的put_data方法上传文件,相关代码如下:

def put_data(    up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,    fname=None):  """上传二进制流到七牛   Args:    up_token:     上传凭证    key:       上传文件名    data:       上传二进制流    params:      自定义变量,规格参考 http://developer.qiniu.com/docs/v6/api/overview/up/response/vars.html#xvar    mime_type:    上传数据的mimeType    check_crc:    是否校验crc32    progress_handler: 上传进度   Returns:    一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}    一个ResponseInfo对象  """  crc = crc32(data) if check_crc else None  return _form_put(up_token, key, data, params, mime_type, crc, progress_handler, fname) def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None, file_name=None):  fields = {}  if params:    for k, v in params.items():      fields[k] = str(v)  if crc:    fields['crc32'] = crc  if key is not None:    fields['key'] = key   fields['token'] = up_token  url = config.get_default('default_zone').get_up_host_by_token(up_token) + '/'  # name = key if key else file_name   fname = file_name  if not fname or not fname.strip():    fname = 'file_name'   r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  if r is None and info.need_retry():    if info.connect_failed:      url = config.get_default('default_zone').get_up_host_backup_by_token(up_token) + '/'    if hasattr(data, 'read') is False:      pass    elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):      data.seek(0)    else:      return r, info    r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})   return r, info def _post_file(url, data, files):  return _post(url, data, files, None) def _post(url, data, files, auth, headers=None):  if _session is None:    _init()  try:    post_headers = _headers.copy()    if headers is not None:      for k, v in headers.items():        post_headers.update({k: v})    r = _session.post(      url, data=data, files=files, auth=auth, headers=post_headers,      timeout=config.get_default('connection_timeout'))  except Exception as e:    return None, ResponseInfo(None, e)  return __return_wrapper(r) def _init():  session = requests.Session()  adapter = requests.adapters.HTTPAdapter(    pool_connections=config.get_default('connection_pool'), pool_maxsize=config.get_default('connection_pool'),    max_retries=config.get_default('connection_retries'))  session.mount('http://', adapter)  global _session  _session = session

最终使用的是requests库上传文件的,统一适配了链接池个数、链接重试次数。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    Django缓存系统实现过程解析
    下一条:
    如何在Django配置文件里配置session链接
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • Filament v3.1版本发布(0个评论)
    • docker + gitea搭建一个git服务器流程步骤(0个评论)
    • websocket的三种架构方式使用优缺点浅析(0个评论)
    • ubuntu20.4系统中宿主机安装nginx服务,docker容器中安装php8.2实现运行laravel10框架网站(0个评论)
    • phpstudy_pro(小皮面板)中安装最新php8.2.9版本流程步骤(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2018-05
    • 2020-02
    • 2020-03
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-08
    • 2020-11
    • 2021-03
    • 2021-09
    • 2021-10
    • 2021-11
    • 2022-01
    • 2022-02
    • 2022-03
    • 2022-08
    • 2023-08
    • 2023-10
    • 2023-12
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客