侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python对ElasticSearch获取数据及操作

Python  /  管理员 发布于 5年前   352

使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下

Version

Python :2.7

ElasticSearch:6.3

代码:

#!/usr/bin/env python# -*- coding: utf-8 -*-"""  @Time  : 2018/7/4  @Author : LiuXueWen  @Site  :   @File  : ElasticSearchOperation.py  @Software: PyCharm  @Description: 对elasticsearch数据的操作,包括获取数据,发送数据"""import elasticsearchimport jsonimport Util_Ini_Operationclass elasticsearch_data():  def __init__(self,hosts,username,password,maxsize,is_ssl):    # 初始化ini操作脚本,获取配置文件    try:      # 判断请求方式是否ssl加密      if is_ssl == "true":        # 获取证书地址        cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")        es_ssl = elasticsearch.Elasticsearch(          # 地址          hosts=hosts,          # 用户名密码          http_auth=(username,password),          # 开启ssl          use_ssl=True,          # 确认有加密证书          verify_certs=True,          # 对应的加密证书地址          client_cert=cert_pem        )        self.es = es_ssl      elif is_ssl == "false":        # 创建普通类型的ES客户端        es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))        self.es = es_ordinary    except Exception as e:      print(e)  def query_data(self,keywords_list,date):    gte = "now-"+str(date)    query_data = {      # 查询语句      "query": {        "bool": {          "must": [            {              "query_string": {                "query": keywords_list,                "analyze_wildcard": True              }            },            {              "range": {                "@timestamp": {                  "gte": gte,                  "lte": "now",                  "format": "epoch_millis"                }              }            }          ],          "must_not": []        }      }    }    return query_data  # 从es获取数据  def get_datas_by_query(self,index_name,keywords,param,date):    '''    :param index_name: 索引名称    :param keywords: 关键字词,数组    :param param: 需要数据条件,例如_source    :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"    :return: all_datas 返回查询到的所有数据(已经过param过滤)    '''    all_datas = []    # 遍历所有的查询条件    for keywords_list in keywords:      # DSL语句      query_data = self.query_data(keywords_list,date)      res = self.es.search(        index=index_name,        body=query_data      )      for hit in res['hits']['hits']:        # 获取指定的内容        response = hit[param]        # 添加所有数据到数据集中        all_datas.append(response)    # 返回所有数据内容    return all_datas  # 当索引不存在创建索引  def create_index(self,index_name):    '''    :param index_name: 索引名称    :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称    '''    # 获取索引的映射    # index_mapping = IndexMapping.index_mapping    # # 判断索引是否存在    # if self.es.indices.exists(index=index_name) is not True:    #   # 创建索引    #   res = self.es.indices.create(index=index_name,body=index_mapping)    #   # 返回结果    #   return res    # else:    #   # 返回索引名称    #   return index_name    pass  # 插入指定的单条数据内容  def insert_single_data(self,index_name,doc_type,data):    '''    :param index_name: 索引名称    :param doc_type: 文档类型    :param data: 需要插入的数据内容    :return: 执行结果    '''    res = self.es.index(index=index_name,doc_type=doc_type,body=data)    return res  # 向ES中新增数据,批量插入  def insert_datas(self,index_name):    '''    :desc 通过读取指定的文件内容获取需要插入的数据集    :param index_name: 索引名称    :return: 插入成功的数据条数    '''    insert_datas = []    # 判断插入数据的索引是否存在    self.createIndex(index_name=index_name)    # 获取插入数据的文件地址    data_file_path = self.ini.get_key_value("datafile","datafilepath")    # 获取需要插入的数据集    with open(data_file_path,"r+") as data_file:      # 获取文件所有数据      data_lines = data_file.readlines()      for data_line in data_lines:        # string to json        data_line = json.loads(data_line)        insert_datas.append(data_line)    # 批量处理    res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)    return res  # 从ES中在指定的索引中删除指定数据(根据id判断)  def delete_data_by_id(self,index_name,doc_type,id):    '''    :param index_name: 索引名称    :param index_type: 文档类型    :param id: 唯一标识id    :return: 删除结果信息    '''    res = self.es.delete(index=index_name,doc_type=doc_type,id=id)    return res  # 根据条件删除数据  def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):    '''    :param index_name:索引名称,为空查询所有索引    :param doc_type:文档类型,为空查询所有文档类型    :param param:过滤条件值    :param gt_time:时间范围,大于该时间    :param lt_time:时间范围,小于该时间    :return:执行条件删除后的结果信息    '''    # DSL语句    query_data = {      # 查询语句      "query": {        "bool": {          "must": [            {              "query_string": {                "query": param,                "analyze_wildcard": True              }            },            {              "range": {                "@timestamp": {                  "gte": gt_time,                  "lte": lt_time,                  "format": "epoch_millis"                }              }            }          ],          "must_not": []        }      }    }    res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)    return res  # 指定index中删除指定时间段内的全部数据  def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):    '''    :param index_name:索引名称,为空查询所有索引    :param doc_type:文档类型,为空查询所有文档类型    :param gt_time:时间范围,大于该时间    :param lt_time:时间范围,小于该时间    :return:执行条件删除后的结果信息    '''    # DSL语句    query_data = {      # 查询语句      "query": {        "bool": {          "must": [            {              "match_all": {}            },            {              "range": {                "@timestamp": {                  "gte": gt_time,                  "lte": lt_time,                  "format": "epoch_millis"                }              }            }          ],          "must_not": []        }      }    }    res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)    return res  # 修改ES中指定的数据  def update_data_by_id(self,index_name,doc_type,id,data):    '''    :param index_name: 索引名称    :param doc_type: 文档类型,为空表示所有类型    :param id: 文档唯一标识编号    :param data: 更新的数据    :return: 更新结果信息    '''    res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)    return res

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    python elasticsearch环境搭建详解
    下一条:
    linux php-fpm 如何重启
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客