k8s python api二次封装 例子
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了k8s python api二次封装 例子,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含6629字,纯文字阅读大概需要10分钟。
内容图文
![k8s python api二次封装 例子](/upload/InfoBanner/zyjiaocheng/714/d28b9591b6764213a2f5f97ba21a3e77.jpg)
k8s python api二次封装
pip install pprint kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml
class K8sApi(object):
def __init__(self):
# self.config = config.kube_config.load_kube_config()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.configuration = client.Configuration()
self.configuration.host = "https://192.168.100.111:6443"
self.configuration.api_key['authorization'] = 'Bearer token'
self.configuration.verify_ssl = False
self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))
####################################################################################################################
def list_deployment(self, namespace="default"):
api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
return api_response
def read_deployment(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
return api_response
def create_deployment(self, file="deploy-nginx.yaml"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.create_namespaced_deployment(
body=dep, namespace="default")
return resp
def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
body=dep)
return resp
def delete_deployment(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
return api_response
####################################################################################################################
def list_node(self):
api_response = self.Api_Instance.list_node()
data = {}
for i in api_response.items:
data[i.metadata.name] = {"name": i.metadata.name,
"status": i.status.conditions[-1].type if i.status.conditions[
-1].status == "True" else "NotReady",
"ip": i.status.addresses[0].address,
"kubelet_version": i.status.node_info.kubelet_version,
"os_image": i.status.node_info.os_image,
}
return data
def list_pod(self):
api_response = self.Api_Instance.list_pod_for_all_namespaces()
data = {}
for i in api_response.items:
data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
return data
def read_pod(self, name="nginx-pod", namespace="default"):
api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
return api_response
def create_pod(self, file="pod-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
return api_response
def replace_pod(self, name="nginx-pod", namespace="default"):
dep = self.read_pod(name)
api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
return api_response
def delete_pod(self, name="nginx-pod", namespace="default"):
api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
return api_response
####################################################################################################################
def list_service(self):
api_response = self.Api_Instance.list_service_for_all_namespaces()
return api_response
def read_service(self, name="", namespace="default"):
api_response = self.Api_Instance.read_namespaced_service(name, namespace)
return api_response
def create_service(self, file="service-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
return api_response
def replace_service(self, name="hequan", namespace="default"):
dep = self.read_pod(name)
api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
return api_response
def delete_service(self, name="hequan", namespace="default"):
api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
return api_response
####################################################################################################################
def list_ingress(self):
api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
return api_response
def read_ingress(self, name="", namespace="default"):
api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
return api_response
def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
return api_response
def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
body=dep)
return api_response
def delete_ingress(self, name="hequan", namespace="default"):
api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
return api_response
#####################################################################################################################
def list_stateful(self):
api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
return api_response
def read_stateful(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
return api_response
def create_stateful(self, file="deploy-nginx.yaml"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.create_namespaced_stateful_set(
body=dep, namespace="default")
return resp
def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
with open(path.join(path.dirname(__file__), file)) as f:
dep = yaml.safe_load(f)
resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
body=dep)
return resp
def delete_stateful(self, name="nginx-deployment", namespace="default"):
api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
return api_response
####################################################################################################################
if __name__ == '__main__':
def test():
obj = K8sApi()
ret = obj.list_deployment()
pprint(ret)
test()
内容总结
以上是互联网集市为您收集整理的k8s python api二次封装 例子全部内容,希望文章能够帮你解决k8s python api二次封装 例子所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。