Joo's
반응형

파이썬에서 쿠버네티스 api로 configmap을 읽으면 text로 읽는다.

이걸 yaml으로 읽으려면 yaml.dump를 하면 된다.

cm = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)

yaml을 json으로 읽으려면 yaml을 다시 json으로 읽으면 된다.

cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
def datetime_converter(o):
    if isinstance(o, datetime):
        return o.isoformat()
    raise TypeError("Object of type '%s' is not JSON serializable" % type(o).__name__)

def get_configmap_as_json(namespace, configmap_name):
    try:
        # 특정 네임스페이스의 ConfigMap을 가져옵니다.
        cm = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
        # 가져온 ConfigMap 객체를 YAML 문자열로 변환합니다.
        cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)
        cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
        return cm_json
    except client.exceptions.ApiException as e:
        print(f"API 오류 발생: {e}")
    configmap_list = []
    configmaps = v1.list_namespaced_config_map(namespace=namespace, label_selector=label_selector)
    try:
        # 조회된 ConfigMap의 이름을 리스트에 추가합니다.
        for cm in configmaps.items:
            name = cm.metadata.name
            configmap_list.append(get_configmap_as_yaml(namespace, name))
        
        # JSON 문자열로 변환하여 응답을 반환합니다.
        # return web.Response(text=configmap_list, content_type='application/json')
            # JSON 문자열로 변환하여 응답을 반환합니다.
        # JSON 문자열로 변환하여 응답을 반환합니다.
        return web.Response(text=json.dumps(configmap_list, default=datetime_converter), content_type='application/json', status=200)
    except client.exceptions.ApiException as e:
        return web.Response(text=json.dumps({'error': str(e)}), status=500)

 

configmap 내부 key를 읽어서 json으로 만들기.

    configmap_list = []
    configmaps = v1.list_namespaced_config_map(namespace=namespace, label_selector=label_selector)
    try:
        # 조회된 ConfigMap의 이름을 리스트에 추가합니다.
        for cm in configmaps.items:
            name = cm.metadata.name
            data = get_configmap_as_yaml(namespace, name)
            # print(data['data'].keys())
            # YAML 형식의 문자열을 JSON 객체로 변환
            for key in data['data'].keys():
                # data['data'][f'{key}'] = yaml.safe_load(data['data'][f'{key}'].replace('$providerType$', 'providerTypePlaceholder'))
                data['data'][f'{key}'] = yaml.safe_load(data['data'][f'{key}'])

yaml이 여러 개인 경우 safe_load_all을 써야 한다.

cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)
cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
for key in cm_json['data'].keys():
    try:
        if key == 'managed':
            managed_datas = list(yaml.safe_load_all(cm_json['data'][f'{key}']))
            cm_json['data'][f'{key}'] = managed_datas
        else:
            cm_json['data'][f'{key}'] = yaml.safe_load(cm_json['data'][f'{key}'])
    except:
        print(f"key error: {key}")
        continue
반응형
profile

Joo's

@JooJY

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!