mesos-docker-executor的运行代码在src/docker/executor.cpp中
-
int main(int argc, char** argv) -
-
GOOGLE_PROTOBUF_VERIFY_VERSION; -
-
mesos::internal::docker::Flags flags; -
-
// Load flags from environment and command line. -
Try<Nothing> load = flags.load(None(), &argc, &argv); -
-
-
cerr << flags.usage(load.error()) << endl; -
-
-
-
std::cout << stringify(flags) << std::endl; -
-
mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals. -
-
-
cout << flags.usage() << endl; -
-
-
-
std::cout << stringify(flags) << std::endl; -
-
if (flags.docker.isNone()) { -
cerr << flags.usage("Missing required option --docker") << endl; -
-
-
-
if (flags.container.isNone()) { -
cerr << flags.usage("Missing required option --container") << endl; -
-
-
-
if (flags.sandbox_directory.isNone()) { -
cerr << flags.usage("Missing required option --sandbox_directory") << endl; -
-
-
-
if (flags.mapped_directory.isNone()) { -
cerr << flags.usage("Missing required option --mapped_directory") << endl; -
-
-
-
if (flags.stop_timeout.isNone()) { -
cerr << flags.usage("Missing required option --stop_timeout") << endl; -
-
-
-
if (flags.launcher_dir.isNone()) { -
cerr << flags.usage("Missing required option --launcher_dir") << endl; -
-
-
-
// The 2nd argument for docker create is set to false so we skip -
// validation when creating a docker abstraction, as the slave -
// should have already validated docker. -
Try<Owned<Docker>> docker = Docker::create( -
-
flags.docker_socket.get(), -
-
-
-
cerr << "Unable to create docker abstraction: " << docker.error() << endl; -
-
-
-
mesos::internal::docker::DockerExecutor executor( -
-
-
flags.sandbox_directory.get(), -
flags.mapped_directory.get(), -
flags.stop_timeout.get(), -
flags.launcher_dir.get()); -
-
mesos::MesosExecutorDriver driver(&executor); -
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE; -
|
如上一篇文章对MesosExecutorDriver的分析,Mesos-slave给Executor发送message运行Task,会调用DockerExecutor的launchTask函数。
-
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task) -
-
dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task); -
|
最后调用DockerExecutorProcess的launchTask函数。
-
void launchTask(ExecutorDriver* driver, const TaskInfo& task) -
-
-
-
status.mutable_task_id()->CopyFrom(task.task_id()); -
status.set_state(TASK_FAILED); -
-
"Attempted to run multiple tasks using a \"docker\" executor"); -
-
driver->sendStatusUpdate(status); -
-
-
-
-
-
-
cout << "Starting task " << taskId.get() << endl; -
-
CHECK(task.has_container()); -
CHECK(task.has_command()); -
-
CHECK(task.container().type() == ContainerInfo::DOCKER); -
-
// We're adding task and executor resources to launch docker since -
// the DockerContainerizer updates the container cgroup limits -
// directly and it expects it to be the sum of both task and -
// executor resources. This does leave to a bit of unaccounted -
// resources for running this executor, but we are assuming -
// this is just a very small amount of overcommit. -
-
-
-
-
-
-
task.resources() + task.executor().resources(), -
-
Subprocess::FD(STDOUT_FILENO), -
Subprocess::FD(STDERR_FILENO)); -
-
run->onAny(defer(self(), &Self::reaped, driver, lambda::_1)); -
-
// Delay sending TASK_RUNNING status update until we receive -
-
inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY) -
.then(defer(self(), [=](const Docker::Container& container) { -
-
-
status.mutable_task_id()->CopyFrom(taskId.get()); -
status.set_state(TASK_RUNNING); -
status.set_data(container.output); -
if (container.ipAddress.isSome()) { -
// TODO(karya): Deprecated -- Remove after 0.25.0 has shipped. -
Label* label = status.mutable_labels()->add_labels(); -
label->set_key("Docker.NetworkSettings.IPAddress"); -
label->set_value(container.ipAddress.get()); -
-
NetworkInfo* networkInfo = -
status.mutable_container_status()->add_network_infos(); -
-
// TODO(CD): Deprecated -- Remove after 0.27.0. -
networkInfo->set_ip_address(container.ipAddress.get()); -
-
NetworkInfo::IPAddress* ipAddress = -
networkInfo->add_ip_addresses(); -
ipAddress->set_ip_address(container.ipAddress.get()); -
-
driver->sendStatusUpdate(status); -
-
-
-
-
-
-
defer(self(), &Self::launchHealthCheck, containerName, task)); -
|
调用Docker函数启容器。