Secret Manager
애플리케이션은 데이터베이스나 외부 시스템과의 연동을 통해 다양한 복잡한 작업을 수행한다. 특히 마이크로서비스 환경에서는 서비스 간 인증이 필수적이며, 이때 ID, 패스워드, 인증서 키, 인증 토큰 등과 같은 credential이 사용된다. 이러한 자격 증명을 안전하게 관리하는 것은 시스템 보안에서 매우 중요한 요소다.
이를 위해 Secret Manager를 사용하면, 애플리케이션에서 사용하는 민감한 정보를 안전하게 관리할 수 있다. 코드에 직접 자격 증명을 포함시키는 위험을 줄일 수 있으며, 별도의 암호화 시스템을 구현하지 않고도 높은 수준의 보안을 확보할 수 있다.
https://guide.ncloud-docs.com/docs/secretmanager-overview

Secret Manager의 주요 기능 요약
안전한 시크릿 저장
- Key Management Service(KMS)를 사용해 시크릿을 암호화하고 최신 보안 표준으로 보호.
세부적인 접근 제어
- 시크릿별로 접근 권한을 세밀히 설정해 불필요한 노출 방지.
자동 시크릿 갱신
- Cloud Functions로 시크릿을 주기적으로 교체해 보안 사고와 번거로움 최소화.
시크릿 접근 로그 관리
- 시크릿 접근 및 변경 이력을 기록해 투명성과 사고 원인 분석 가능.
다양한 시크릿 형태 지원
- 문자열, 인증서, 토큰 등 모든 시크릿 형태를 유연하게 정의하고 관리 가능
Secret Manager는 주로 데이터베이스 자격 증명을 안전하게 관리하는 데 활용된다. 일반적으로 코드 내 .env 파일을 통해 변수로 관리하던 방식 대신, Secret Manager를 사용하면 OAuth, API Key, 데이터베이스 비밀번호 등의 민감한 정보를 안전하게 저장하고 효율적으로 불러올 수 있다. 이를 위해 Secret Manager API를 통해 애플리케이션에서 필요한 시점에 자격 증명을 호출하는 방식으로 구성할 수 있다.
또한 Secret Manager는 Key Management Service(KMS)와 연동된 암호화 기능을 제공하여 저장된 키 값을 보다 안전하게 보호할 수 있다. 추가적으로 Cloud Function을 활용하면, 주기적으로 데이터베이스 비밀번호를 자동으로 갱신하고 변경된 값을 Secret Manager에 반영할 수 있어, 보안 수준을 더욱 향상시킬 수 있다.
Secret Manager 주요 개념
자격증명과 보안암호
- 자격증명(Credential): ID, 패스워드, 보안암호 등 시스템 접근 권한을 확인하는 데이터.
- Secret Manager는 이러한 자격증명을 안전하게 관리하며 보안암호와 같은 개념으로 취급.
시크릿(Secret)
- Secret Manager에서 관리하는 데이터 단위로, 시크릿 값과 관련 메타 정보를 포함.
- 삭제 시 복구 불가하며, 개인정보를 직접 포함하지 않음.
- 시크릿 값 삭제 후 부수 정보는 최대 3개월 동안 유지 후 파기.
시크릿 생명 주기
- 상태: 이용 가능 → 일시 중지 → 교체 진행 중 → 삭제 예정 → 최종 삭제.
- 삭제 요청 후 7일 대기 후 완전 삭제되며, 최종 삭제 전까지 관리 기능 사용 가능.
시크릿 체인
- 시크릿 값의 교체 이력을 스테이지(Stage) 상태로 시간 순서대로 관리.
시크릿 보호
- Key Management Service(KMS)를 사용해 모든 시크릿을 암호화.
- 기본 키 또는 사용자 관리 키를 통해 보호 가능.
- 봉투 암호화(Envelope Encryption) 방식을 사용해 각 시크릿을 개별적으로 암호화
Secret Manager 의 Secret 값 불러오기 예시
기본 정보 입력

Secret 이름을 지정한 후, 보호 키 옵션에서는 기본 제공되는 Secret Manager Default Key를 사용할 수도 있고, 사용자가 직접 생성한 KMS키를 선택할 수도 있다. 본 실습에서는 기존에 생성해두었던 사용자 정의 KMS 키인 test 키를 선택하여 Secret을 보호하도록 구성하였다.
교체 설정
자동 교체 기능은 Cloud Function을 활용해야만 사용할 수 있다. Cloud Function을 통해 데이터베이스 정보를 자동으로 업데이트할 수 있으며, 이에 대한 코드 예시도 제공된다. 이번 실습에서는 자동 교체 기능은 사용하지 않기로 하였다.
시크릿 교체 액션 예제 코드 URL: https://guide.ncloud-docs.com/docs/secretmanager-rotate-action
import hmac
import hashlib
import base64
import json
import requests
import mysql.connector
from mysql.connector import Error
import time
# Constants
ACCESS_KEY = None
SECRET_KEY = None
BASE_URL = None
JOB_TOKEN = None
def init_requests(secret_id, job_token, access_key, secret_key):
global BASE_URL, JOB_TOKEN, API_KEY, ACCESS_KEY, SECRET_KEY
BASE_URL = f"https://secretmanager.apigw.ntruss.com/action/v1/secrets/{secret_id}/jobs/{job_token}"
JOB_TOKEN = job_token
ACCESS_KEY = access_key
SECRET_KEY = secret_key
def make_signature(method, url, timestamp):
message = f"{method} {url}\n{timestamp}\n{ACCESS_KEY}"
signing_key = SECRET_KEY.encode('utf-8')
mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
return base64.b64encode(mac.digest()).decode('utf-8')
def main(args):
secret_id = args["secretId"]
job_token = args["jobToken"]
access_key = args["accessKey"]
secret_key = args["secretKey"]
result = {}
skip_update_pending = False
try:
init_requests(secret_id, job_token, access_key, secret_key)
print(f"[Secret Rotation Job (secret_id={secret_id})]")
secret_value_response = start_rotation()
print("[STEP1 COMPLETE] start rotation")
secret_value = secret_value_response["data"]
secret_chain = secret_value["decryptedSecretChain"]
rotation_targets = secret_value["rotationTargets"]
# Step 2: Generate secret value
if "pending" not in secret_chain or secret_chain["pending"] is None:
pending = {
"cdbHost": json.loads(secret_chain["active"])["cdbHost"],
"cdbPort": json.loads(secret_chain["active"])["cdbPort"],
"cdbUser": json.loads(secret_chain["active"])["cdbUser"],
"cdbDatabase": json.loads(secret_chain["active"])["cdbDatabase"],
"cdbPassword": set_secret_value()
}
secret_chain["pending"] = json.dumps(pending)
print("[STEP2 COMPLETE] update pending")
else:
skip_update_pending = True
print("[STEP2 SIKP] already exist pending")
change_password(secret_chain, rotation_targets)
print("[STEP3 COMPLETE] rotate secret")
test_password(secret_chain, rotation_targets)
print("[STEP4 COMPLETE] test secret")
# Step 5: Update Pending
if skip_update_pending:
print("[STEP5 SKIP] update pending")
else:
update_pending(secret_chain, rotation_targets)
print("[STEP5 COMPLETE] update pending")
complete_rotation()
print("[STEP6 COMPLETE] complete rotation")
result["done"] = True
result["pending"] = secret_chain["pending"]
except Exception as ex:
print(str(ex))
fail_rotation()
result["done"] = False
result["error_message"] = str(ex)
print(json.dumps(result, indent=4))
return result
def start_rotation():
return execute_request("/start", {}, "POST")
def set_secret_value():
req_body = {
"length": 16,
"excludeCharacters": "\"&'+/\\`",
"requireEachIncludedType": True
}
response = execute_request("/generate-random-secret", req_body, "POST")
print(json.dumps(response, indent=4))
return response.get("randomString")
def change_password(secret_chain, rotation_targets):
if not rotation_targets:
raise ValueError("rotationTargets is empty")
rotation_target = rotation_targets[0]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
print(f"cdbHost: {cdb_host}, cdbDatabase: {cdb_database}, cdbUser: {cdb_user}, cdbPort: {cdb_port}")
if "pending" not in secret_chain:
raise ValueError("error_message: pending is None")
update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain)
def update_password(cdb_host, cdb_port, cdb_user, cdb_database, rotation_target, secret_chain):
active = json.loads(secret_chain["active"])
previous = json.loads(secret_chain.get("previous", "{}"))
pending = json.loads(secret_chain["pending"])
pending_password = pending[rotation_target]
try:
conn = try_connection(cdb_host, cdb_port, cdb_user, pending_password, cdb_database, "pending")
if conn:
conn.close()
return
active_password = active[rotation_target]
conn = try_connection(cdb_host, cdb_port, cdb_user, active_password, cdb_database, "active")
if conn:
update_db_password(conn, cdb_user, pending_password)
return
if "previous" in secret_chain:
previous_password = previous.get(rotation_target)
conn = try_connection(cdb_host, cdb_port, cdb_user, previous_password, cdb_database, "previous")
if conn:
update_db_password(conn, cdb_user, pending_password)
else:
raise ValueError("All Secret Values are not valid")
except Error as ex:
print("[STEP3 ERROR]", ex)
raise
def try_connection(cdb_host, cdb_port, cdb_user, password, cdb_database, password_type):
print(f"Connecting using the {password_type} password")
return db_connect(cdb_host, cdb_port, cdb_user, password, cdb_database)
def update_db_password(conn, cdb_user, new_password):
query = f"ALTER USER '{cdb_user}'@'%' IDENTIFIED BY '{new_password}'"
cursor = conn.cursor()
cursor.execute(query)
conn.commit()
cursor.close()
def test_password(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
pending_value = json.loads(secret_chain["pending"])
pending_password = pending_value[rotation_target]
active_value = json.loads(secret_chain["active"])
cdb_host = active_value["cdbHost"]
cdb_database = active_value["cdbDatabase"]
cdb_user = active_value["cdbUser"]
cdb_port = int(active_value["cdbPort"])
conn = db_connect(cdb_host, cdb_port, cdb_user, pending_password, cdb_database)
if conn is None:
raise ValueError("error_message: connection failed")
conn.close()
def update_pending(secret_chain, rotation_targets):
rotation_target = rotation_targets[0]
non_change_values = json.loads(secret_chain["active"])
cdb_host = non_change_values["cdbHost"]
cdb_database = non_change_values["cdbDatabase"]
cdb_user = non_change_values["cdbUser"]
cdb_port = non_change_values["cdbPort"]
pending = {
"cdbHost": cdb_host,
"cdbPort": cdb_port,
"cdbUser": cdb_user,
"cdbDatabase": cdb_database,
"cdbPassword": json.loads(secret_chain["pending"])[rotation_target]
}
req_body = {
"value": json.dumps(pending)
}
execute_request("/pending", req_body, "PUT")
def complete_rotation():
execute_request("/complete", {}, "POST")
def fail_rotation():
try:
execute_request("/fail", {}, "POST")
except Exception as ex:
print("[FAIL ROTATION NOTIFICATION ERROR]", ex)
def execute_request(endpoint, request_body, method):
url = BASE_URL + endpoint
print("Request url:", url)
timestamp = str(int(time.time() * 1000))
headers = {
"Content-Type": "application/json",
"x-ncp-apigw-timestamp": timestamp,
"x-ncp-iam-access-key": ACCESS_KEY,
"x-ncp-apigw-signature-v2": make_signature(method, url.replace("https://secretmanager.apigw.ntruss.com", ""), timestamp),
"SECRET-JOB-TOKEN": JOB_TOKEN
}
if method == "POST":
response = requests.post(url, headers=headers, json=request_body)
elif method == "PUT":
response = requests.put(url, headers=headers, json=request_body)
else:
raise ValueError("Invalid HTTP method")
response_body = response.json()
print("Request Body:", request_body)
print("Response Body:", response_body)
if response.status_code != 200:
raise Exception(f"Unexpected response {response.status_code}")
return response_body
def db_connect(cdb_host, cdb_port, cdb_user, cdb_pass, cdb_database):
try:
connection = mysql.connector.connect(
host=cdb_host,
port=cdb_port,
user=cdb_user,
password=cdb_pass,
database=cdb_database
)
print("Database connection established successfully")
return connection
except Error as e:
print("Database connection failed:", e)
return None
Secret 값 입력

다음과 같이 CDB을 만들었다고 가정하고 CDB에 필요한 값들을 Key-Value 형태로 입력해주겠다.
최종확인

이제 Secret이 정상적으로 생성되었다. 이어서 Python 코드를 활용하여 NCP API를 통해 해당 Secret 값을 호출해보도록 하겠다.
Secret Manager Get Value API 호출

생성한 Secret ID를 메모해둔다.
NCP Secret Manager API URL: https://api.ncloud-docs.com/docs/secretmanager-getsecretvalue
import hmac
import hashlib
import base64
import json
import requests
import time
# Constants
ACCESS_KEY = "" # 본인의 액세스 키
SECRET_KEY = "" # 본인의 비밀 키
BASE_URL = "https://secretmanager.apigw.ntruss.com/api/v1/secrets/" # Secret Manager API 기본 URL
# 서명 생성 함수
def make_signature(method, url, timestamp):
# API 요청을 위한 서명 생성
path = url.replace("https://secretmanager.apigw.ntruss.com", "")
message = f"{method} {path}\n{timestamp}\n{ACCESS_KEY}"
signing_key = SECRET_KEY.encode('utf-8')
mac = hmac.new(signing_key, message.encode('utf-8'), hashlib.sha256)
return base64.b64encode(mac.digest()).decode('utf-8')
# 비밀 값 가져오기 함수
def get_secret_value(secret_id):
# Secret Manager API에서 비밀 값을 가져오는 GET 요청
url = BASE_URL + f"{secret_id}/values"
timestamp = str(int(time.time() * 1000)) # 현재 타임스탬프 (밀리초 단위)
headers = {
"Content-Type": "application/json",
"x-ncp-apigw-timestamp": timestamp,
"x-ncp-iam-access-key": ACCESS_KEY,
"x-ncp-apigw-signature-v2": make_signature("GET", url, timestamp),
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json() # 성공적으로 비밀 값 반환
else:
raise Exception(f"Failed to retrieve secret: {response.status_code} {response.text}")
# 메인 실행 함수
def main():
secret_id = "" # 메모해두었던 secret_id로 수정
try:
# secret_id를 사용하여 비밀 값 가져오기
secret_data = get_secret_value(secret_id)
# 복호화된 비밀 체인 및 회전 대상 추출
decrypted_secret_chain = secret_data["data"]["decryptedSecretChain"]
rotation_targets = secret_data["data"]["rotationTargets"]
# 복호화된 비밀 체인 출력
print(f"Decrypted Secret Chain:")
print(f"Previous: {decrypted_secret_chain.get('previous')}")
print(f"Active: {decrypted_secret_chain.get('active')}")
print(f"Pending: {decrypted_secret_chain.get('pending')}")
# 회전 대상 출력
print("\nRotation Targets:")
for target in rotation_targets:
print(f"- {target}")
except Exception as ex:
print(f"Error: {ex}")
# 호출
main()

로컬에서 Python 함수를 호출한 결과, Secret에 저장해두었던 Key-Value 값들이 정상적으로 출력되는 것을 확인할 수 있었다.
또한 Secret 값은 콘솔에서도 직접 확인할 수 있다. 만약 Cloud Function을 통해 키 값을 자동으로 갱신하도록 설정해두었다면, 해당 Secret은 콘솔에서 상태가 Pending으로 표시되며 새로운 값이 반영되는 것을 확인할 수 있을 것이다.

'NCP' 카테고리의 다른 글
| [NCP] Certificate Manager를 통한 SSL인증서 발급 (0) | 2024.12.31 |
|---|---|
| [NCP] Global DNS 도메인 매핑 (0) | 2024.12.31 |
| [NCP] Cloud Function 기반 Slack 알림을 활용한 Object Storage 자동화 관리 (0) | 2024.12.31 |
| [NCP] SSL VPN (0) | 2024.12.31 |
| [NCP] STS를 통한 임시자격 증명 (0) | 2024.12.31 |