반응형
파이썬에서 쿠버네티스 api로 configmap을 읽으면 text로 읽는다.
이걸 yaml으로 읽으려면 yaml.dump를 하면 된다.
cm = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)
yaml을 json으로 읽으려면 yaml을 다시 json으로 읽으면 된다.
cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
def datetime_converter(o):
if isinstance(o, datetime):
return o.isoformat()
raise TypeError("Object of type '%s' is not JSON serializable" % type(o).__name__)
def get_configmap_as_json(namespace, configmap_name):
try:
# 특정 네임스페이스의 ConfigMap을 가져옵니다.
cm = v1.read_namespaced_config_map(name=configmap_name, namespace=namespace)
# 가져온 ConfigMap 객체를 YAML 문자열로 변환합니다.
cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)
cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
return cm_json
except client.exceptions.ApiException as e:
print(f"API 오류 발생: {e}")
configmap_list = []
configmaps = v1.list_namespaced_config_map(namespace=namespace, label_selector=label_selector)
try:
# 조회된 ConfigMap의 이름을 리스트에 추가합니다.
for cm in configmaps.items:
name = cm.metadata.name
configmap_list.append(get_configmap_as_yaml(namespace, name))
# JSON 문자열로 변환하여 응답을 반환합니다.
# return web.Response(text=configmap_list, content_type='application/json')
# JSON 문자열로 변환하여 응답을 반환합니다.
# JSON 문자열로 변환하여 응답을 반환합니다.
return web.Response(text=json.dumps(configmap_list, default=datetime_converter), content_type='application/json', status=200)
except client.exceptions.ApiException as e:
return web.Response(text=json.dumps({'error': str(e)}), status=500)
configmap 내부 key를 읽어서 json으로 만들기.
configmap_list = []
configmaps = v1.list_namespaced_config_map(namespace=namespace, label_selector=label_selector)
try:
# 조회된 ConfigMap의 이름을 리스트에 추가합니다.
for cm in configmaps.items:
name = cm.metadata.name
data = get_configmap_as_yaml(namespace, name)
# print(data['data'].keys())
# YAML 형식의 문자열을 JSON 객체로 변환
for key in data['data'].keys():
# data['data'][f'{key}'] = yaml.safe_load(data['data'][f'{key}'].replace('$providerType$', 'providerTypePlaceholder'))
data['data'][f'{key}'] = yaml.safe_load(data['data'][f'{key}'])
yaml이 여러 개인 경우 safe_load_all을 써야 한다.
cm_yaml = yaml.dump(cm.to_dict(), allow_unicode=True)
cm_json = yaml.load(cm_yaml,Loader=yaml.CLoader)
for key in cm_json['data'].keys():
try:
if key == 'managed':
managed_datas = list(yaml.safe_load_all(cm_json['data'][f'{key}']))
cm_json['data'][f'{key}'] = managed_datas
else:
cm_json['data'][f'{key}'] = yaml.safe_load(cm_json['data'][f'{key}'])
except:
print(f"key error: {key}")
continue
반응형
'배운 내용 > Kubernetes' 카테고리의 다른 글
[crossplane] 이미 생성된 리소스 observe하기 (0) | 2024.05.16 |
---|---|
[crossplane] vpc, subnet 만들기 (0) | 2024.05.16 |
[crossplane] 모든 MetaResource 확인하기 (0) | 2024.05.16 |
쿠버네티스 포트 포워딩 (1) | 2024.04.24 |
Unable to connect to the server: dial tcp 192.168.49.2:8443: connect: no route to host (0) | 2023.04.12 |