CML中的模型共享和MLOps简介

发布日期:2020-12-28 17:23
微信图片_20201228131358
如今机器学习(ML)的应用门槛大大降低,在许多组织许多项目中的使用越来越普遍。但是在模型投产之后,仍会有许多意想不到的挑战。许多企业已成功地将最初的少数模型投入生产,但仍然在努力简化、扩展和优化模型的部署和管控方式,从而在其业务的每个单元中服务于数量越来越多的机器学习场景和用例。事实证明,机器学习最困难的部分实际上不是开始的建模和训练,而是最后一公里:在生产应用程序中有效部署、操作和管控机器学习模型。这最后一公里的挑战可分为三大类:

部署与投产:
在对模型进行培训并准备就绪后,下一步就是将其部署到生产中。通常,由于模型的整个部署和投产流程与前期的特征工程和建模训练等流程(前期无需考虑扩展性等问题)缺乏一致性,人们在此步骤中遇到可伸缩性、可管理和监控等问题。如果仅需生产几个模型,当然采用一次性人工处理的方法来解决问题,但这不是容易扩展的方式。

关键在于注意到许多模型服务和部署工作流具有可重复的样板方面,企业可以使用现代DevOps技术(例如高频灰度部署和微服务体系结构)来实现自动化。这样做可以使ML工程师专注于模型本身,而不是周围的代码和基础结构。

模型监控:
模型可以定义为用于提供预测的软件。它们可以采用多种形式,从基于Python的rest API到R脚本再到像SparkML这样的分布式框架。模型监视软件也并不是什么新鲜事物,并且已有相当长的一段时间来监视诸如响应时间和吞吐量之类的技术性能的工具。

然而,模型在一个重要方面与普通应用程序相比是独特的-它们可以预测周围不断变化的世界。了解模型在生产上如何在功能上发挥作用是一个棘手的问题,大多数企业尚未解决。这是由于模型行为的独特性和复杂性,需要用于监视概念偏移和模型准确性之类的自定义工具。客户需要专用的模型监视解决方案,该解决方案可以灵活地处理模型生命周期和行为的复杂性。

模型管控:
各个行业的数据和机器学习的法律法规环境都在迅速发展。如果您已经将模型投入生产并进行有效监控,那当然不错。但是当生产环境中模型越来越多的时候,您将面临更多复杂的模型治理和管理挑战。几乎所有机器学习的模型治理需求都与数据本身有关-哪些数据可用于某些应用程序(即用于信用评分的受保护类),谁应该能够访问哪些数据以及如何创建模型-都是直接与组织中的数据管理实践联系在一起。当今的企业需要利用其数据治理解决方案将ML作为自然的扩展。

CML中的MLOps和SDX模型:
Cloudera Machine Learning(CML)是Cloudera Data Platform(CDP)上的机器学习端到端工程化平台(CDP之前的年代名字叫做CDSW),除了建模、训练和投产之外,CML也为数据科学家提供了模型监视和治理的功能,同时还解决了我们现有服务基础架构中更关键的用例。此版本的主要功能包括:
ML模型监控服务,指标存储和SDK
在此版本中,我们构建了一流的模型监视服务,旨在以可重复,安全和可扩展的方式解决技术监视(延迟,吞吐量等)以及功能或预测监视。这包括一个可伸缩的度量标准存储库,用于在评分期间和之后捕获模型所需的任何度量标准,用于跟踪单个模型预测的唯一标识符,用于可视化这些度量标准的UI,以及用于跟踪度量标准并使用自定义代码进行分析的Python SDK。
SDX for Models:模型编目、治理和全生命周期的血缘关系
Cloudera的共享数据体验(SDX)是一种旨在在整个数据生命周期内实现整体安全性,治理和合规性的功能,现已扩展到生产环境中的机器学习模型。这意味着可以使用直接从Cloudera数据平台(CDP)继承的访问,治理和安全规则将模型部署到生产中。
此外,用于模型的SDX通过扩展Apache Atlas使其包含模型元数据,从而实现完整的ML生命周期治理,该模型元数据与Atlas众所周知的现有数据治理功能集成在一起。通过与数据本身集成,我们现在可以解决以下问题:解释模型生成方式的能力(例如,使用什么数据训练模型以及数据来自何处),从而向生产环境显示真实,完整的数据源血统。Atlas的灵活性可实现从模型和功能生命周期管理到可解释性和可解释性的广泛功能。 

模型部署的高可用性:
最后,模型通常用于需要高可用性服务以优化正常运行时间的关键任务应用中。Cloudera Machine Learning现在已经能够为多个版本提供实时模型,并且在此版本中,我们消除了单点故障,以确保已部署的模型对其运行的Kubernetes集群具有高可用性。这样可以大大减少问题,并消除生产环境中的意外停机时间。

探索一个例子:
在此示例中,我们将利用现有项目和新功能来监视和管理已部署的模型。该演示旨在了解葡萄酒的特性(例如pH值,酒精含量等),并根据以前的葡萄酒专家的意见,简单地确定葡萄酒对葡萄酒的评价是“好”还是“差”。 

我们想要跟踪该模型的所有输入及其做出的预测,然后分析准确性和偏移。另外,我们希望对该模型进行分类,并了解使用了哪些数据进行训练。

Github存储库:https : //github.com/fastforwardlabs/mlops-wine-quality-demo

设置您的CML项目:
要开始这个示例,我们需要一个有效的项目。第一步是将存储库克隆到新的CML项目中。 


然后,我们需要从S3存储桶中使用CSV文件设置一些Hive表。 
在CML会话终端窗口中:


这将在CDP数据湖中设置并创建两个表,我们将使用它们来训练模型。我们可以看到它们是在Apache Atlas中创建的-我们想要保存完全限定的名称以供以后使用。


分析数据并训练模型:
接下来,您可以通过打开CML会话并运行analyse.py文件来运行样本分析。

通过此分析,我们还获得了“差”和“优秀”的分布,我们将在以后使用。

训练模型:

创建部署模型:
现在,我们希望将此经过训练的模型作为API进行部署,以便可以使用“葡萄酒评估师”网页来确定某酒是“不良”还是“优秀”。我们将在model.py Python应用程序中进行此操作,并利用新的SDK功能来跟踪我对此模型的输入和输出。

spark = SparkSession.builder \

      .appName("wine-quality-model") \

      .master("local[*]") \

      .config("spark.driver.memory","4g")\

      .config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")\

      .config("spark.hadoop.fs.s3a.metadatastore.impl","org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore")\

      .config("spark.hadoop.fs.s3a.delegation.token.binding","")\

      .config("spark.hadoop.yarn.resourcemanager.principal","csso_abreshears")\

      .getOrCreate()

model = PipelineModel.load("file:///home/cdsw/models/spark")

schema = StructType([StructField("fixedAcidity", DoubleType(), True),     

  StructField("volatileAcidity", DoubleType(), True),     

  StructField("citricAcid", DoubleType(), True),     

  StructField("residualSugar", DoubleType(), True),     

  StructField("chlorides", DoubleType(), True),     

  StructField("freeSulfurDioxide", DoubleType(), True),     

  StructField("totalSulfurDioxide", DoubleType(), True),     

  StructField("density", DoubleType(), True),     

  StructField("pH", DoubleType(), True),     

  StructField("sulphates", DoubleType(), True),     

  StructField("Alcohol", DoubleType(), True)

])


# Decorate predict function with new functionality that 

# 1) sends metrics via the track_metric() method

# 2) adds a unique identifier for tracking the outputs


@cdsw.model_metrics

def predict(args):

  split=args["feature"].split(";")

  features=[list(map(float,split[:11]))]

  features_df = spark.createDataFrame(features, schema)#.collect()

  features_list = features_df.collect()


  # Let's track the inputs to the model

  for x in features_list:

    cdsw.track_metric("fixedAcidity", x["fixedAcidity"])

    cdsw.track_metric("volatileAcidity", x["volatileAcidity"])

    cdsw.track_metric("citricAcid", x["citricAcid"])

    cdsw.track_metric("residualSugar", x["residualSugar"])

    cdsw.track_metric("chlorides", x["chlorides"])

    cdsw.track_metric("freeSulfurDioxide", x["freeSulfurDioxide"])

    cdsw.track_metric("totalSulfurDioxide", x["totalSulfurDioxide"])

    cdsw.track_metric("density", x["density"])

    cdsw.track_metric("pH", x["pH"])

    cdsw.track_metric("sulphates", x["sulphates"])

    cdsw.track_metric("Alcohol", x["Alcohol"])


  resultdf=model.transform(features_df).toPandas()["prediction"][0]


  if resultdf == 1.0:

    to_return = {"result": "Poor"}

  else:

    to_return = {"result" : "Excellent"}

  # Let's track the prediction we're making

  cdsw.track_metric("prediction", to_return["result"])

  return to_return


# pre-heat the model

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

time.sleep(1)

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

time.sleep(2)

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

time.sleep(1)

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

time.sleep(3)

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

time.sleep(1)

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

time.sleep(1)

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

predict({"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}) #bad

predict({"feature": "7.3;0.65;0.0;1.2;0.065;15.0;21.0;0.9946;3.39;0.47;10.0"}) #good

设置血缘关系:
作为部署模型的一部分,我们希望在新模型目录中捕获有关它的元数据。将自动捕获在CML中创建的资产,例如项目,模型构建和模型部署。但是,我们想知道此特定模型的训练数据。由于训练是迭代的,并且我们可能不想捕获所有中间步骤,因此我们将创建一个lineage.yml文件,该文件定义了我用来训练模型的表。这将在Apache Atlas中进行选择和解析,以显示数据以对部署沿袭进行建模。 

{
  "Predict w/ Monitoring & Lineage": {
    "hive_table_qualified_names": ["default.wineds_ext_nolabel@cm", "default.wineds_ext@cm"]
  }
}

部署模型:
现在,我们将使用CML的模型功能将模型部署为其余API。


现在,我们可以使用CML测试REST API。请注意我们可以在下游流程中使用的预测uuid,以便将预测与后续操作相关联。例如,我们可以使用一个Web应用程序,葡萄酒专家可以选择一种葡萄酒,如果他们认为尝试过的葡萄酒是“差”或“优秀”。我们稍后可以将其用于准确性报告。 

使用以下示例作为输入示例,以简化部署后的模型测试:
{"feature": "7.4;0.7;0;1.9;0.076;11;34;0.9978;3.51;0.56;9.4"}

最后,在“模型”屏幕上,我们还为部署的模型获得了唯一的Cloudera资源名称(CRN),称为“模型部署CRN”。它看起来类似于: 
crn:cdp:ml:us-west-1:9d74eee4-1cad-45d7-b645-7ccf9edbb73d:workspace:c4b02aca-fcae-4440-9acc-c38c2d6a7d2c/b1c02929-6cc7-424d-b92f-a169f9f395fe

分析指标:
现在,我们要分析模型在技术(延迟)上和功能(偏移)上的性能。我们将使用Python SDK进行此分析。不过,首先,我们要设置一个每30分钟运行一次的CML作业,与训练集相比,该作业将对模型的输出进行卡方检验(“优秀”为50%,“较差”为50%) 。

measure_drift.py:
#计算偏移量并提交给MLOps进行进一步分析
import cdsw, time, os

import json

import pandas as pd

import matplotlib.pyplot as plt

import numpy as np

from scipy.stats import chisquare

# Define our uqique model deployment id

model_deployment_crn = "crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:ec3efe6f-c4f5-4593-857b-a80698e4857e/d5c3fbbe-d604-4f3b-b98a-227ecbd741b4"

# Define our training distribution for 

training_distribution_percent = pd.DataFrame({"Excellent": [0.50], "Poor": [0.50]})

training_distribution_percent

current_timestamp_ms = int(round(time.time() * 1000))

known_metrics = cdsw.read_metrics(model_deployment_crn=model_deployment_crn,

            start_timestamp_ms=0,

            end_timestamp_ms=current_timestamp_ms)  

df = pd.io.json.json_normalize(known_metrics["metrics"])

df.tail()

# Test if current distribution is different than training data set

prediction_dist_series = df.groupby(df["metrics.prediction"]).describe()["metrics.Alcohol"]["count"]

prediction_dist_series

x2, pv = chisquare([(training_distribution_percent["Poor"] * len(df))[0], \

                    (training_distribution_percent["Excellent"] * len(df))[0]],\

                   [prediction_dist_series[0], prediction_dist_series[1]])

print(x2, pv)

# Put it back into MLOps for Tracking

cdsw.track_aggregate_metrics({"chisq_x2": x2, "chisq_p": pv}, current_timestamp_ms, current_timestamp_ms, model_deployment_crn=model_deployment_crn)

现在,让我们安排它使用CML作业每30分钟运行一次。

现在,我们可以使用analyst_metrics.py文件使用pandas和matplotlib分析指标。

# Performs custom analytics on desired metrics.

# Performs custom analytics on desired metrics.

import cdsw, time, os

import json

import pandas as pd

import matplotlib.pyplot as plt

import numpy as np

from scipy.stats import chisquare

# Define our uqique model deployment id

model_deployment_crn = "crn:cdp:ml:us-west-1:12a0079b-1591-4ca0-b721-a446bda74e67:workspace:ec3efe6f-c4f5-4593-857b-a80698e4857e/d5c3fbbe-d604-4f3b-b98a-227ecbd741b4"

# Define our training distribution for

training_distribution_percent = pd.DataFrame({"Excellent": [0.50], "Poor": [0.50]})

training_distribution_percent

current_timestamp_ms = int(round(time.time() * 1000))

known_metrics = cdsw.read_metrics(model_deployment_crn=model_deployment_crn,

            start_timestamp_ms=0,

            end_timestamp_ms=current_timestamp_ms)

df = pd.io.json.json_normalize(known_metrics["metrics"])

# Do some conversions & Calculations

df['startTimeStampMs'] = pd.to_datetime(df['startTimeStampMs'], unit='ms')

df['endTimeStampMs'] = pd.to_datetime(df['endTimeStampMs'], unit='ms')

df["processing_time"] = (df["endTimeStampMs"] - df["startTimeStampMs"]).dt.microseconds * 1000

non_agg_metrics = df.dropna(subset=["metrics.prediction"])

non_agg_metrics.tail()

# Visualize the processing time

non_agg_metrics.plot(kind='line', x='predictionUuid', y='processing_time')

# Visualize the output distribution

prediction_dist_series = non_agg_metrics.groupby(non_agg_metrics["metrics.prediction"]).describe()["metrics.Alcohol"]["count"]

prediction_dist_series.plot("bar")

# Visualize chi squared from my bi-hourly run

chi_sq_metrics = df.dropna(subset=["metrics.chisq_x2"])

chi_sq_metrics.plot(kind='line', x='endTimeStampMs', y=['metrics.chisq_x2', 'metrics.chisq_p']

部分输出:
从该图可以确定卡方检验是否随时间改变其输出,这意味着该模型可能正在偏移。



然后,我可以查看预测的分布,这使我看到我们开始比训练数据集获得更多的“优秀”。稍后我可能想进一步深入研究。

最后,我想了解一下REST服务随时间的延迟。我可以看到,当首次部署模型时,处理请求花费了更长的时间,但是随着时间的推移已经趋于平稳。


使用模型目录:
现在我们已经能够部署和监视模型,我们希望利用模型目录来确定用于训练该模型的表。我们将在CDP中为Data Lake打开Apache Atlas,并寻找该模型的模型部署。您会注意到,Atlas自动捕获对象和来自CML的历史记录以进行跟踪。

找到模型部署后,我们将单击“ Lineage”选项卡,然后可以看到从原始S3存储桶到Hive表,再到正在构建和部署的模型–帮助说明模型的生成方式。

为企业生产ML的未来铺平道路:
Cloudera Machine Learning的MLOps功能和ML模型的SDX为生产机器学习工作流程提供开放,标准化和灵活的工具。无论您是开始ML之旅还是希望将ML用例扩展到成百上千,CML都会为整个企业的端到端生产ML提供唯一的混合云原生机器学习平台。 

Cloudera Machine Learning(CML)可以使用一组扩展的MLOps生产机器学习功能。组织可以使用CML的新MLOps功能和用于模型的Cloudera SDX来管理和保护ML生命周期,以进行生产机器学习。数据科学家,机器学习工程师和操作员可以在一个统一的解决方案中进行协作,从而大大缩短了实现价值的时间,并将生产机器学习模型的业务风险降至最低。

Blue Badge Insights的创始人兼首席执行官Andrew Brust表示:“已经超过采用机器学习的试验阶段的公司正在寻求将生产部署扩展到整个业务的数百甚至数千个ML模型。“如此规模的管理,监视和治理模型绝非定制过程。借助真正的ML运营平台,公司可以使AI成为其数字化转型业务的关键任务组件。”

具有新的MLOps功能的Cloudera机器学习版本和用于模型的Cloudera SDX发行,提供了一组基本的模型和生命周期管理功能,以实现可伸缩,透明和受管的方法,这些方法可用于扩展模型部署和ML用例。

好处包括:
  • 独特的模型分类和沿袭功能使您可以查看整个ML生命周期,从而消除孤岛和盲点,从而实现整个生命周期的透明性,可解释性和责任感。
  • 完整的端到端机器学习生命周期管理,包括将机器学习模型安全地部署到生产环境,确保准确性和扩展用例所需的一切。
  • 一流的模型监视服务,旨在以可重复,安全和可扩展的方式跟踪和监视技术方面以及预测的准确性。
  • 建立在100%开源标准之上,并与Cloudera Data Platform完全集成,使客户可以集成到现有和将来的工具中,而不必局限于单个供应商。

“ Cloudera一直在我们的行业中与一些最大的客户和合作伙伴合作,为机器学习元数据建立开放标准,” Cloudera首席产品官Arun Murthy说。“我们已经将这些标准实施为Cloudera机器学习的一部分,以提供企业在大规模生产中部署和维持机器学习模型所需的一切。凭借一流的模型部署,安全性,治理和监视,这是首个端到端ML解决方案,可实现对整个生命周期的管理,从数据到ML驱动的跨混合云和多云业务影响。”

Cloudera Machine Learning(CML)中可用的扩展的生产机器学习功能集包括:
  • MLOps的新功能可监视机器学习模型的功能和业务绩效:
    • 利用本机存储并访问自定义和任意模型指标,检测模型性能并随时间推移进行漂移。
    • 测量并跟踪单个预测的准确性,确保模型符合要求并实现最佳性能。

  • 用于模型的Cloudera SDX扩展了SDX治理功能,现在支持模型:
    • 通过模型分类,完整的生命周期沿袭以及Apache Atlas中的自定义元数据,跟踪,管理和了解整个企业中部署的大量ML模型。
    • 查看与在单个系统中构建和部署的模型有关的数据沿袭,以帮助管理和控制ML生命周期。
    • 增强了模型REST端点的模型安全性,从而允许在CML生产环境中为模型提供服务而不会损害安全性。
分享到:
推荐精彩博文