沪ICP备2021032517号-1

Logstash迁移ES数据

  |   0 评论   |   0 浏览

全量迁移

input {
  elasticsearch {
  hosts => [ "10.10.0.249:9200" ]
  user => "admin"
  password => "123456"
  index => "source_access_log"
  size => 1000
  scroll => "1m"
  docinfo => true
  docinfo_fields => ["_routing","_index","_type","_id"]
  }
}

filter {

  mutate { 
  remove_field => ["@timestamp","@version"]
  }

}

output {
  elasticsearch {
  hosts => [ "10.10.1.94:9200" ]
  document_type => "%{[@metadata][_type]}"
  index => "source_access_log"
  user => "admin"
  password => "123456"
  }

}
  1. scroll:指定Elasticsearch的滚动查询的超时时间。滚动查询是一种在大量数据中分批获取数据的方法。
  2. slices:指定并行处理的切片数量,以提高大规模数据的处理性能。
  3. size:指定每次查询返回的最大文档数。默认值为10。
  4. docinfo: 如果将docinfo参数设置为true,则Logstash会在事件中添加一些元数据字段,以提供有关源文档的信息。这些字段包括_index(索引名称)、_type(文档类型)、_id(文档ID)和_version`(文档版本)。

按时间范围迁移

如索引中时间字段为:loggingTime

input {
  elasticsearch {
  hosts => [ "10.10.0.249:9200" ]
  user => "admin"
  password => "123456"
  index => "source_access_log"
  query => '{"query":{"range":{"loggingTime":{"gte":"1691251200000","lte":"1691337599000"}}}}'
  size => 1000
  scroll => "1m"
  docinfo => true
  docinfo_fields => ["_routing","_index","_type","_id"]
  }
}

filter {

  mutate { 
  remove_field => ["@timestamp","@version"]
  }

}

output {
  elasticsearch {
  hosts => [ "10.10.1.94:9200" ]
  document_type => "%{[@metadata][_type]}"
  index => "source_access_log"
  user => "admin"
  password => "123456"
  }

}

数据验证

POST source_access_log/_count
{
  "query": {
    "bool": {
      "must": {
        "range": {
          "loggingTime": {
            "gte": 1691078400000,
            "lte": 1691164799000
          }
        }
      }
    }
  }
}

按时间排序查询

GET source_access_log/_search
{
  "size": 1,
  "sort": [
    {
      "loggingTime": {
        "order": "desc"
      }
    }
  ]
}

根据查询条件迁移

input {
  elasticsearch {
  hosts => [ "10.10.4.20:9200" ]
  user => "elastic"
  password => "12356"
  index => "user-app-prod-2023.12.19"
  #query => '{"query":{"range":{"@timestamp":{"gte":"2023-12-18T23:20:00.000Z","lte":"2023-12-19T00:10:00.000Z"}}}}'
  query => '{
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ],
  "query": {
    "bool": {
      "filter": [
        {
          "match_phrase": {
            "message": "/v1.0/account/refreshToken"
          }
        },
        {
          "range": {
            "@timestamp": {
              "gte": "2023-12-18T23:20:00.000Z",
              "lte": "2023-12-19T00:10:00.000Z"
            }
          }
        }
      ]
    }
  }
}'
  size => 1000
  scroll => "1m"
  docinfo => true
  docinfo_fields => ["_routing","_index","_type","_id"]
  }
}

filter {

  grok {
    match => { "message" => '"beanId":%{NUMBER:extracted_beanId}' }
  }

}

output {
 
  #stdout {codec=>rubydebug{}}
  elasticsearch {
  document_type => "%{[@metadata][_type]}"
  hosts => [ "10.10.4.20:9200" ]
  user => "elastic"
  password => "123456"
  index => "user-auth-service-bak-2023.12.19"
  }
}

保持元数据一致迁移

在使用logstash迁移数据且要保持元数据一致的情况下,比如某些场景需要保持 元数据 _id 字段值一致。

在output中直接使用:document_type => "%{[@metadata][_type]}" 可能不生效

output {
  #stdout {codec=>rubydebug{metadata => true}}
  elasticsearch {
  hosts => [ "10.10.1.94:9200" ]
  document_id => "%{[@metadata][_id]}"
  index => "source_access_log"
  user => "admin"
  password => "123456"
  }

}

在logstash开启debug模式(stdout {codec=>rubydebug{metadata => true}})观察后发现原数据字段结构有差别:

{
             "cityCode" => "430500",
             "latitude" => 27.12085,
                  "swp" => [],
             "isEnable" => true,
        "sourceStoreId" => "94491",
            "isDeleted" => true,
            "closeTime" => "18:00",
    "addressLongitude1" => 111.052255,
    "addressLongitude0" => 111.058828,
           "sourceCpId" => "XC001",
            "storeName" => "汽车生活馆",
                   "id" => 182297,
             "openTime" => "08:00",
         "sourceCpName" => "盛大",
            "longitude" => 111.046941,
            "storeType" => 3,
           "coordinate" => [
        [0] 111.046941,
        [1] 27.12085
    ],
          "contactName" => "老板",
         "provinceCode" => "430000",
            "@metadata" => {
        "input" => {
            "elasticsearch" => {
                 "_type" => "_doc",
                   "_id" => "182297",
                "_index" => "ccc_store"
            }
        }
    },
             "storeNmu" => "430582012",
       "storeRunStatus" => "3",
     "addressLatitude0" => 27.123285,
           "countyCode" => "430582",
     "addressLatitude1" => 27.117615
}

此时

document_id => "%{[@metadata][_id]}"

应该改成

document_id => "%{[@metadata][input][elasticsearch][_id]}"

以多条件查询内容导出文件

input {
  elasticsearch {
  hosts => [ "10.10.4.20:9200" ]
  user => "elastic"
  password => "123456"
  index => "app-prod-2024.02.22"
  query => '{
  "sort": [
    {
      "@timestamp": {
        "order": "desc"
      }
    }
  ],
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "@timestamp": {
              "gte": "2024-02-22T06:00:00.000Z",
              "lte": "2024-02-22T07:00:00.000Z"
            }
          }
        },
        {
          "bool": {
            "should": [
              {
                "bool": {
                  "must": [
                    {
                      "match": {
                        "logger": "c.n.w.n.func.Task"
                      }
                    },
                    {
                      "match": {
                        "logger_line": "84"
                      }
                    }
                  ]
                }
              },
              {
                "bool": {
                  "must": [
                    {
                      "match": {
                        "logger": "c.n.w.n.func.Task"
                      }
                    },
                    {
                      "match": {
                        "logger_line": "14"
                      }
                    }
                  ]
                }
              },
              {
                "bool": {
                  "must": [
                    {
                      "match": {
                        "logger": "c.n.w.n.s.PlatformAnswerServiceImpl"
                      }
                    }
                  ]
                }
              }
            ]
          }
        }
      ]
    }
  }
}'
  size => 10000
  scroll => "10m"
  }
}

filter {

   json{
     source => "message"
    }
      mutate {
           remove_field => "tags"
      }

}

output {
  #stdout {codec=>rubydebug{}}
file {
        path => "/data/app.log"
        flush_interval => "0"
}
}

标题:Logstash迁移ES数据
作者:zifuy
地址:https://www.zifuy.cn/articles/2023/08/09/1691548567202.html