packagepubsubimport("context""fmt""github.com/testcontainers/testcontainers-go""github.com/testcontainers/testcontainers-go/wait")// pubsubContainer represents the pubsub container type used in the moduletypepubsubContainerstruct{testcontainers.ContainerURIstring}// startContainer creates an instance of the pubsub container typefuncstartContainer(ctxcontext.Context)(*pubsubContainer,error){req:=testcontainers.ContainerRequest{Image:"gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators",ExposedPorts:[]string{"8085/tcp"},WaitingFor:wait.ForLog("started"),Cmd:[]string{"/bin/sh","-c","gcloud beta emulators pubsub start --host-port 0.0.0.0:8085",},}container,err:=testcontainers.GenericContainer(ctx,testcontainers.GenericContainerRequest{ContainerRequest:req,Started:true,})iferr!=nil{returnnil,err}mappedPort,err:=container.MappedPort(ctx,"8085")iferr!=nil{returnnil,err}hostIP,err:=container.Host(ctx)iferr!=nil{returnnil,err}uri:=fmt.Sprintf("%s:%s",hostIP,mappedPort.Port())return&pubsubContainer{Container:container,URI:uri},nil}
packagepubsubimport("cloud.google.com/go/pubsub""context""google.golang.org/api/option""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure""testing")funcTestPubsub(t*testing.T){ctx:=context.Background()container,err:=startContainer(ctx)iferr!=nil{t.Fatal(err)}// Clean up the container after the test is completet.Cleanup(func(){iferr:=container.Terminate(ctx);err!=nil{t.Fatalf("failed to terminate container: %s",err)}})conn,err:=grpc.Dial(container.URI,grpc.WithTransportCredentials(insecure.NewCredentials()))iferr!=nil{t.Fatal(err)}options:=[]option.ClientOption{option.WithGRPCConn(conn)}client,err:=pubsub.NewClient(ctx,"my-project-id",options...)iferr!=nil{t.Fatal(err)}deferclient.Close()topic,err:=client.CreateTopic(ctx,"greetings")iferr!=nil{t.Fatal(err)}subscription,err:=client.CreateSubscription(ctx,"subscription",pubsub.SubscriptionConfig{Topic:topic})iferr!=nil{t.Fatal(err)}result:=topic.Publish(ctx,&pubsub.Message{Data:[]byte("Hello World")})_,err=result.Get(ctx)iferr!=nil{t.Fatal(err)}// perform assertionsvardata[]bytecctx,cancel:=context.WithCancel(ctx)err=subscription.Receive(cctx,func(ctxcontext.Context,m*pubsub.Message){data=m.Datam.Ack()defercancel()})ifstring(data)!="Hello World"{t.Fatalf("Expected value %s. Got %s.","Hello World",data)}}