本文分享自华为云社区《Python 解析JSON实现主机管理》,作者: LyShark。

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它以易于阅读和编写的文本形式表示数据。JSON 是一种独立于编程语言的数据格式,因此在不同的编程语言中都有对应的解析器和生成器。JSON 格式的设计目标是易于理解、支持复杂数据结构和具有良好的可扩展性。

JSON 数据是以键值对的形式存在的,而且易于阅读和编写。以下是一个简单的 JSON 示例:

{  "name": "John Doe",  "age": 30,  "city": "New York",  "isStudent": false,  "grades": [95, 88, 75, 92],  "address": {    "street": "123 Main St",    "zipCode": "10001"  }}

在这个例子中,JSON 对象包含了一些属性,包括字符串、数字、布尔值、数组和嵌套的对象。

  • "name": "John Doe":字符串键值对。
  • "age": 30:数字键值对。
  • "city": "New York":字符串键值对。
  • "isStudent": false:布尔键值对。
  • "grades": [95, 88, 75, 92]:数组键值对。
  • "address": {...}:嵌套对象。

在实际应用中,JSON 数据通常用于前后端之间的数据交换,或者配置文件的存储。各种编程语言都提供了处理JSON数据的库或模块。

很早之前大概是两年前,当时为了实现批量管理SSH账号密码并实现自动巡检功能,写过一个简单的命令行工具,通过使用JSON实现对特定主机账号密码与组的管理,如下代码,通过定义AdminDataBase()类,传如数据库文件名database.json实现对特定JSON文件的增删改查功能,在编写该案例后我对JSON的使用变得更加深刻了。

# -*- coding: utf-8 -*-import os,jsonclass AdminDataBase(object):    def __init__(self, database_path):        self.database_path = database_path    # 判断数据库文件是否存在,不存则则创建一个.    def InitDatabase(self):        if os.path.exists(self.database_path) != None :            init_database = \                {                    "HostList": [["1000", "127.0.0.1", "username", "password", "22"]],                    "HostGroup": [{"DefaultGroup": ["1000"]}, ],                }            with open(self.database_path, "w", encoding="utf-8") as fp:                fp.write(json.dumps(init_database))            print("[+] {} 结构已被初始化.".format(self.database_path))    # table 接收一个列表名,根据列表名输出列表中的数据    def ShowHostList(self):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads(Read_Pointer.read())            base = load_json.get("HostList")            print("-" * 80)            print("UUID \t 主机地址 \t\t\t 登录用户 \t\t 登录密码 \t 端口")            print("-" * 80)            for each in range(0, len(base)):                print("{0:4} \t {1:15} \t {2:10} \t {3:10} \t {4:10}"                      .format(base[each][0], base[each][1], base[each][2], base[each][3], base[each][4]))        print()    # 在原来的基础上添加一台新的主机,添加到HostList表中    def AddHost(self, uuid, address, username, password, port):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            # 读取 database.json 中的数据到内存中            load_json = json.loads(Read_Pointer.read())            # 先读入内存,修改后全部替换进去            host_list = load_json.get("HostList")            # 获取到最后一次循环的元素列表            for each in range(len(host_list) - 1, len(host_list)):                # 判断如果UUID不重复则添加,否则跳过添加                if (host_list[each][0] != uuid):                    host_list.append([uuid, address, username, password, port])                    load_json["HostList"] = host_list                    # 最后再次将改好的数据,回写到文件保存                    with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                        dump_json = json.dumps(load_json)                        Write_Pointer.write(dump_json)                        print("[+] UUID {} 添加成功.".format(uuid))                else:                    print("[-] UUID {} 列表中存在,添加失败.".format(uuid))                    return 0    # 根据传入UUID号修改特定主机数据    def ModifyHost(self,uuid,modify_address,modify_username,modify_password,modify_port):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            # 读取 database.json 中的数据到内存中            load_json = json.loads(Read_Pointer.read())            host_list = load_json.get("HostList")            # 根据uuid寻找一维数组的下标位置            index = -1            for index, value in enumerate(host_list):                if value[0] == uuid:                    print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index))                    break                else:                    index = -1            # 判断是否找到了,找到了则修改            if index != -1:                # 修改指定下标数值,并将修改后的数据放入原始JSON文件中                host_list[index] = [uuid,modify_address,modify_username,modify_password,modify_port]                load_json["HostList"] = host_list                # 最后再次将改好的数据,回写到文件保存                with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                    dump_json = json.dumps(load_json)                    Write_Pointer.write(dump_json)                    print("[+] UUID {} 修改完成.".format(uuid))                return 0            else:                print("[-] UUID {} 不存在.".format(uuid))        return 0    # 根据传入的UUID号删除主机数据,先删除所对的组中的数据,然后在删除主机数据    def DeleteHost(self,uuid):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads(Read_Pointer.read())            host_group = load_json.get("HostGroup")            # 首先在HostGroup中寻找并弹出相应UUID            for each in range(0,len(host_group)):                try:                    # 循环每个字典                    for k,v in host_group[each].items():                        # 判断UUID是否存在于每个字典中                        if v.count(uuid) != 0:                            # 移除并放入原始JSON中                            v.remove(uuid)                            load_json["HostGroup"] = v                            # 最后再次将改好的数据,回写到文件保存                            with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                                dump_json = json.dumps(load_json)                                Write_Pointer.write(dump_json)                                #print("[+] UUID {} 修改完成.".format(uuid))                        else:                            #print("[-] 当前组 {} 不能存在: {}".format(k,uuid))                            break                except Exception:                    print("[-] 当前组不能存在: {}".format(uuid))                    return 0            # ----------------------------------------------            # 其次寻找HostList中的数据并弹出            host_list = load_json.get("HostList")            try:                # 根据uuid寻找一维数组的下标位置                index = -1                for index, value in enumerate(host_list):                    if value[0] == uuid:                        #print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index))                        break                    else:                        index = -1                # 如果找到了,则直接根据下标弹出数据                if index != -1:                    host_list.pop(index)                load_json["HostList"] = host_list                # 最后再次将改好的数据,回写到文件保存                with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                    dump_json = json.dumps(load_json)                    Write_Pointer.write(dump_json)                print("[+] UUID {} 主机: {} 已被移除.".format(uuid,host_list[index][1]))                return 0            except Exception:                return 0    # 向数据库中的HostGroup字段中添加一个主机组    def AddHostGroup(self,add_group_name):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环所有字典中的组            for each in range(0, len(group_obj)):                list_name = str(list(group_obj[each].keys())[0])                # print("组节点: {}".format(list_name))                # 循环判断表中是否存在指定的组名称                if (list_name == add_group_name):                    print("[-] {} 存在于组中,无法继续添加.".format(list_name))                    return 0            # 读取原始JSON文件,并构建回写参数            tmp = {}            tmp[add_group_name] = ["1000"]            group_obj.append(tmp)            # 回写添加主机组            with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                dump_json = json.dumps(load_json)                Write_Pointer.write(dump_json)            print("[+] 主机组 {} 已添加".format(add_group_name))    # 在主机组中删除一个主机组    def DeleteHostGroup(self,delete_group_name):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环所有字典中的组            for each in range(0, len(group_obj)):                list_name = str(list(group_obj[each].keys())[0])                # 循环判断表中是否存在指定的组名称                if (list_name == delete_group_name):                    # 如果存在于组中,那我们就把他的each弹出列表                    group_obj.pop(each)                    # 将弹出后的列表再次写回JSON序列中                    with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:                        dump_json = json.dumps(load_json)                        Write_Pointer.write(dump_json)                    print("[-] 主机组 {} 已移除.".format(delete_group_name))                    return 0            print("[-] 主机组 {} 不存在.".format(delete_group_name))        return 0    # 向指定主机组中添加一个主机UUID    def AddHostGroupOnUUID(self,group_name, uuid):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环所有字典中的所有组            for each in range(0, len(group_obj)):                list_name = str(list(group_obj[each].keys())[0])                # 如果找到了需要添加的组                if (list_name == group_name):                    tmp = group_obj[each]                    # 获取到原始列表中的数据                    val = list(tmp.values())[0]                    # 判断输入的UUID号,是否在val列表中,不在则添加                    if str(uuid) not in val:                        val.append(str(uuid))                        # 将原始数据赋值到新的表中                        group_obj[each][list_name] = val                    else:                        print("[-] UUID {} 已存在于 {} 主机组,添加失败.".format(uuid,group_name))                        return 0        with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:            dump_json = json.dumps(load_json)            Write_Pointer.write(dump_json)            print("[+] 向主机组 {} 增加UUID {} 完成".format(group_name, uuid))        return 0    # 从指定主机组中删除一个指定的UUID号    def DeleteHostGroupOnUUID(self,group_name, uuid):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json =json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环所有字典中的组            for each in range(0, len(group_obj)):                list_name = str(list(group_obj[each].keys())[0])                # 先来寻找到需要删除的主机组                if (list_name == group_name):                    tmp = group_obj[each]                    # 寻找指定组中的指定列表                    val = list(tmp.values())[0]                    for x in range(0, len(val)):                        if (val[x] == uuid):                            print("[*] 搜索UUID: {} 索引值: {}".format(val[x], x))                            # 将要删除的UUID弹出列表                            val.pop(x)                            # 将弹出后的列表重新复制到主机组中                            group_obj[each][list_name] = val                            # 保存弹出后的列表到序列中                            with open(self.database_path, "w", encoding="utf-8") as write_fp:                                dump_json = json.dumps(load_json)                                write_fp.write(dump_json)                            print("[+] 从主机组 {} 弹出UUID {} 完成".format(group_name, uuid))                            return 0        return 0    # 输出所有主机组和全部节点信息    def ShowAllGroup(self):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环解析所有组,并解析出UUID所对应的主机地址等信息            for each in range(0, len(group_obj)):                for k, v in group_obj[each].items():                    print("-" * 140)                    print("主机组: {}".format(k))                    print("-" * 140)                    for each in range(0, len(v)):                        # print("--> UUID: {}".format(v[each]))                        # 循环判断,拿着组内UUID判断是否存在,如果存在则打印出主机详细信息                        base_obj = load_json.get("HostList")                        for x in range(0, len(base_obj)):                            if (v[each] == base_obj[x][0]):                                print("UUID: {0:6} \t 主机地址: {1:15} \t 主机账号: {2:15} \t 主机密码: {3:15} \t 端口: {4:5} \t".                                      format(base_obj[x][0], base_obj[x][1], base_obj[x][2], base_obj[x][3],                                             base_obj[x][4]))                    print("\n")    # 只显示所有的主机组,包括组中主机数    def ShowGroup(self):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            print("-" * 80)            print("{0:20} \t {1:5} \t {2:100}".format("主机组名","主机数","主机列表"))            print("-" * 80)            for each in range(0,len(group_obj)):                for k,v in group_obj[each].items():                    print("{0:20} \t {1:5} \t {2:100}".format(k,str(len(v)), str(v)))        print()        return 0    # 对指定的主机组进行Ping测试    def PingGroup(self,group_name):        with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:            load_json = json.loads( Read_Pointer.read() )            group_obj = load_json.get("HostGroup")            # 循环遍历主机列表            for each in range(0, len(group_obj)):                for k, v in group_obj[each].items():                    # 寻找主机组,找到后提取出所有的Value                    if (k == group_name):                        item_list = v                        # 再循环,拿着v的值对base进行循环,提取出其中的用户名密码等                        for val in range(0, len(item_list)):                            # 得到循环列表:print(item_list[val])                            base_obj = load_json.get("HostList")                            # 循环base表中的计数器                            for base_count in range(0, len(base_obj)):                                # 判断base表中是否存在指定UUID,如果存在则输出其值                                if (base_obj[base_count][0] == item_list[val]):                                    print("地址: {} \t 用户名: {} [ok]".format(base_obj[base_count][1],base_obj[base_count][2]))if __name__ == "__main__":    db = AdminDataBase("database.json")    while True:        try:            cmd = str(input("[LyShell] # ")).split()            if (cmd == ""):                continue            elif (cmd[0] == "exit"):                exit(1)            elif (cmd[0] == "clear"):                os.system("cls")            elif(cmd[0] == "Init"):                db.InitDatabase()            elif(cmd[0] == "ShowHostList"):                db.ShowHostList()            elif(cmd[0] == "ShowGroup"):                db.ShowGroup()            elif(cmd[0] == "ShowAllGroup"):                db.ShowAllGroup()            elif(cmd[0] == "AddHost" or cmd[0] == "ModifyHost"):                if (len(cmd) - 1 >= 5):                    uuid = str(cmd[1]).split("=")[1]                    address = str(cmd[2]).split("=")[1]                    username = str(cmd[3]).split("=")[1]                    password = str(cmd[4]).split("=")[1]                    port = str(cmd[5]).split("=")[1]                    if cmd[0] == "AddHost":                        db.AddHost(uuid,address,username,password,port)                    elif cmd[0] == "ModifyHost":                        db.ModifyHost(uuid,address,username,password,port)            elif(cmd[0] == "DeleteHost"):                if(len(cmd)-1 >= 1):                    uuid = str(cmd[1]).split("=")[1]                    db.DeleteHost(uuid)            elif(cmd[0] == "AddHostGroup"):                group_name = str(cmd[1]).split("=")[1]                db.AddHostGroup(group_name)            elif(cmd[0] == "DeleteHostGroup"):                group_name = str(cmd[1]).split("=")[1]                db.DeleteHostGroup(group_name)            elif(cmd[0] == "AddHostGroupOnUUID"):                group_name = str(cmd[1]).split("=")[1]                uuid = str(cmd[2]).split("=")[1]                db.AddHostGroupOnUUID(group_name,uuid)            elif(cmd[0] == "DelHostGroupOnUUID"):                group_name = str(cmd[1]).split("=")[1]                uuid = str(cmd[2]).split("=")[1]                db.DeleteHostGroupOnUUID(group_name,uuid)            elif(cmd[0] == "PingGroup"):                group_name = str(cmd[1]).split("=")[1]                db.PingGroup(group_name)            elif (cmd[0] == "help"):                print("By: LyShark")            else:                print("Not Command")        except Exception:            continue

使用案例

管理数据库(AdminDataBase)中的主机信息和主机分组信息。用户通过输入命令来执行不同的操作,如初始化数据库、显示主机列表、添加主机、修改主机信息、删除主机等。

以下是代码的主要功能和命令列表:

  • 初始化数据库:Init
  • 显示主机列表:ShowHostList
  • 显示主机分组:ShowGroup
  • 显示所有主机分组:ShowAllGroup
  • 添加主机:AddHost
  • 修改主机信息:ModifyHost
  • 删除主机:DeleteHost
  • 添加主机分组:AddHostGroup
  • 删除主机分组:DeleteHostGroup
  • 将主机添加到指定分组:AddHostGroupOnUUID
  • 从指定分组删除主机:DelHostGroupOnUUID
  • 按组执行 Ping 操作:PingGroup
  • 清屏:clear
  • 退出程序:exit
  • 帮助:help

Init

初次使用需要执行本命令,对数据库文件进行写出,如下所示;

ShowHostList

用于输出当前主机列表信息,如下图所示;

ShowGroup

用于输出当前主机组,如下图所示;

ShowAllGroup

用于输出所有的主机组以及组内的主机详细信息,如下图所示;

AddHost

添加一个新的主机记录,如下图所示;

ModifyHost

修改一个已有的主机记录,此处以UUID作为修改条件,如下图所示;

DeleteHost

根据一个UUID唯一标识,删除一个已存在的主机记录,如下图所示;

AddHostGroup

新增一个组名,默认会携带1000为初始主机,如下图所示;

DeleteHostGroup

删除一整个主机组,如下图所示;

AddHostGroupOnUUID

根据UUID号将特定主机添加到特定组内,如下图所示;

DelHostGroupOnUUID

根据主机组名,删除特定的UUID,如下图所示;

PingGroup

对特定主机组执行Ping功能测试,此处可以扩展,如下图所示;

总结部分

该案例只是用于学习如何灵活运用JSON实现数据的增删改查,其实在实战中意义不大,因为完全可以使用SQLite这类精简数据库,此案例只是本人为了熟悉JSON的增删改查而写的一个Demo工具。

点击关注,第一时间了解华为云新鲜技术~